Files
postgenerator/backend/services/generation_pipeline.py
Michele 9e7205eca2 feat(04-01): integrazione Unsplash in pipeline + CSVBuilder + export
- CSVBuilder.build_csv() e build_csv_content() accettano image_url_map opzionale
- _resolve_image() risolve keyword->URL Unsplash con fallback keyword originale
- _build_rows() chiama _resolve_image per cover, slides e cta image keywords
- JobStatus ha campo image_url_map con persistenza su disco JSON
- GenerationPipeline._resolve_unsplash_keywords() chiamato dopo batch LLM
- Carica unsplash_api_key da settings.json, crea UnsplashService, chiama resolve_keywords
- image_url_map salvato nel job JSON per riuso in export con edits
- Export router recupera image_url_map dal job JSON e passa a build_csv_content
- generate_single NON risolve Unsplash (velocità e riuso map job originale)
2026-03-09 08:10:06 +01:00

686 lines
24 KiB
Python

"""GenerationPipeline — orchestra calendario -> LLM -> CSV con per-item error isolation.
Gestisce:
- Generazione async in background (asyncio.create_task)
- Progresso real-time tracciato in _jobs dict (completed/total/current_post)
- Per-item error isolation: un fallimento NON blocca il batch (Pitfall 5)
- Persistenza su disco per ricaricamento dopo restart
- Job status per polling dal frontend
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal, Optional
from backend.config import DATA_PATH
from backend.schemas.calendar import CalendarRequest, CalendarResponse, CalendarSlot
from backend.schemas.generate import (
GenerateResponse,
GeneratedPost,
PostResult,
TopicResult,
)
from backend.services.calendar_service import CalendarService
from backend.services.csv_builder import CSVBuilder
from backend.services.format_selector import FormatSelector
from backend.services.llm_service import LLMService
from backend.services.prompt_service import PromptService
from backend.services.unsplash_service import UnsplashService
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Mappa formato_narrativo -> nome file prompt
# ---------------------------------------------------------------------------
_FORMAT_TO_PROMPT: dict[str, str] = {
"PAS": "pas_valore",
"AIDA": "aida_promozione",
"BAB": "bab_storytelling",
"Listicle": "listicle_valore",
"Storytelling": "bab_storytelling", # fallback su BAB per storytelling
"Dato_Implicazione": "dato_news",
"Obiezione_Risposta": "pas_valore", # fallback su PAS per obiezione/risposta
}
_DEFAULT_PROMPT = "pas_valore"
# ---------------------------------------------------------------------------
# Dataclass per tracciare lo stato di un job
# ---------------------------------------------------------------------------
@dataclass
class JobStatus:
"""Stato real-time di un job di generazione batch."""
job_id: str
status: Literal["running", "completed", "failed"]
total: int
completed: int
current_post: int
results: list[PostResult] = field(default_factory=list)
calendar: Optional[CalendarResponse] = None
error: Optional[str] = None
campagna: str = ""
image_url_map: Optional[dict[str, str]] = None
"""Mappa keyword->URL Unsplash risolta dopo la generazione batch. None se Unsplash non configurato."""
# ---------------------------------------------------------------------------
# Pipeline principale
# ---------------------------------------------------------------------------
class GenerationPipeline:
"""Orchestra il flusso completo: calendario -> topic -> LLM -> CSV."""
def __init__(
self,
llm_service: LLMService,
prompt_service: PromptService,
calendar_service: CalendarService,
format_selector: FormatSelector,
csv_builder: CSVBuilder,
outputs_path: Path,
) -> None:
"""Inizializza la pipeline con i servizi necessari.
Args:
llm_service: Servizio per chiamate LLM con retry.
prompt_service: Servizio per caricare/compilare prompt .txt.
calendar_service: Servizio per generare il calendario editoriale.
format_selector: Selettore formato narrativo per slot.
csv_builder: Builder per CSV Canva-compatibile.
outputs_path: Directory dove salvare i file CSV e JSON dei job.
"""
self._llm = llm_service
self._prompts = prompt_service
self._calendar = calendar_service
self._formats = format_selector
self._csv = csv_builder
self._outputs_path = outputs_path
# Dizionario in-memory per lo stato real-time dei job
self._jobs: dict[str, JobStatus] = {}
# ---------------------------------------------------------------------------
# API pubblica
# ---------------------------------------------------------------------------
def generate_bulk_async(
self,
request: CalendarRequest,
) -> str:
"""Avvia la generazione batch come background task e ritorna subito.
Questo metodo è SINCRONO dal punto di vista del caller: genera il job_id,
inizializza lo stato, lancia il background task, e ritorna immediatamente.
Il frontend può fare polling su get_job_status() per seguire il progresso.
Args:
request: Parametri del calendario (obiettivo, nicchie, etc.).
Returns:
job_id (UUID stringa) da usare per polling e download CSV.
"""
job_id = str(uuid.uuid4())
# Genera il calendario in modo sincrono (è CPU-bound, veloce)
calendar = self._calendar.generate_calendar(request)
# Inizializza stato job
self._jobs[job_id] = JobStatus(
job_id=job_id,
status="running",
total=len(calendar.slots),
completed=0,
current_post=0,
results=[],
calendar=calendar,
campagna=request.obiettivo_campagna,
)
logger.info(
"Job avviato | job_id=%s | slot_totali=%d | campagna=%s",
job_id,
len(calendar.slots),
request.obiettivo_campagna[:50],
)
# Lancia il background task (non blocca)
asyncio.create_task(
self._run_generation(job_id, calendar, request)
)
return job_id
def get_job_status(self, job_id: str) -> Optional[JobStatus]:
"""Ritorna lo stato corrente del job per polling.
Se il job non è in memoria, prova a caricarlo dal file JSON su disco
(per supportare restart del server).
Args:
job_id: Identificatore del job.
Returns:
JobStatus aggiornato, o None se il job non esiste.
"""
if job_id in self._jobs:
return self._jobs[job_id]
# Prova a caricare da disco
return self._load_job_from_disk(job_id)
def get_job_results(self, job_id: str) -> Optional[GenerateResponse]:
"""Ritorna i risultati completi di un job completato.
Args:
job_id: Identificatore del job.
Returns:
GenerateResponse con tutti i PostResult, o None se non trovato.
"""
status = self.get_job_status(job_id)
if status is None:
return None
success_count = sum(1 for r in status.results if r.status == "success")
failed_count = sum(1 for r in status.results if r.status == "failed")
return GenerateResponse(
campagna=status.campagna,
results=status.results,
total=status.total,
success_count=success_count,
failed_count=failed_count,
calendar=status.calendar,
)
def generate_single(
self,
slot: CalendarSlot,
obiettivo_campagna: str,
brand_name: Optional[str] = None,
tono: Optional[str] = None,
) -> PostResult:
"""Genera un singolo post per uno slot.
Utile per rigenerare post falliti o per test.
Args:
slot: Slot del calendario con metadati strategici.
obiettivo_campagna: Obiettivo principale della campagna.
brand_name: Nome del brand (opzionale).
tono: Tono di voce (opzionale).
Returns:
PostResult con status="success" o "failed".
"""
system_prompt = self._prompts.load_prompt("system_prompt")
try:
# Genera topic se mancante
topic = slot.topic
if not topic:
topic = self._llm.generate_topic(
system_prompt=system_prompt,
obiettivo_campagna=obiettivo_campagna,
tipo_contenuto=slot.tipo_contenuto,
target_nicchia=slot.target_nicchia,
fase_campagna=slot.fase_campagna,
livello_schwartz=slot.livello_schwartz,
prompt_service=self._prompts,
)
# Genera il post
post = self._generate_post_for_slot(
slot=slot,
topic=topic,
obiettivo_campagna=obiettivo_campagna,
brand_name=brand_name or "il tuo brand",
tono=tono or "diretto e concreto",
system_prompt=system_prompt,
)
return PostResult(
slot_index=slot.indice,
status="success",
post=post,
)
except Exception as e:
logger.error(
"Errore generazione singola | slot_index=%d | errore=%s",
slot.indice,
str(e),
)
return PostResult(
slot_index=slot.indice,
status="failed",
error=str(e),
)
# ---------------------------------------------------------------------------
# Background task
# ---------------------------------------------------------------------------
async def _run_generation(
self,
job_id: str,
calendar: CalendarResponse,
request: CalendarRequest,
) -> None:
"""Task asincrono che genera tutti i post del batch.
Esegue il loop su ogni slot del calendario con per-item error isolation:
ogni slot è in un try/except INDIVIDUALE — un fallimento NON blocca il loop.
Args:
job_id: Identificatore del job.
calendar: Calendario con gli slot da generare.
request: Parametri della richiesta originale.
"""
job = self._jobs[job_id]
system_prompt = self._prompts.load_prompt("system_prompt")
brand_name = "il tuo brand" # Verrà preso da settings in fase successiva
tono = "diretto e concreto"
try:
for slot in calendar.slots:
# Aggiorna progresso real-time
job.current_post = slot.indice
# CRITICO Pitfall 5: try/except INDIVIDUALE per slot
# Un fallimento NON blocca il batch
try:
logger.info(
"Generazione slot | job_id=%s | slot=%d/%d | tipo=%s | nicchia=%s",
job_id,
slot.indice + 1,
job.total,
slot.tipo_contenuto,
slot.target_nicchia,
)
# Genera topic se non presente nello slot.
# Controlla prima gli override passati dall'utente dallo Swipe File.
topic = slot.topic
if not topic and request.topic_overrides and slot.indice in request.topic_overrides:
topic = request.topic_overrides[slot.indice]
logger.info(
"Topic override applicato | job_id=%s | slot=%d | topic=%s",
job_id,
slot.indice,
topic[:60],
)
if not topic:
# Usa asyncio.to_thread per non bloccare l'event loop
# (generate_topic usa time.sleep per inter_request_delay)
topic = await asyncio.to_thread(
self._llm.generate_topic,
system_prompt,
request.obiettivo_campagna,
slot.tipo_contenuto,
slot.target_nicchia,
slot.fase_campagna,
slot.livello_schwartz,
self._prompts,
)
# Genera il post completo
post = await asyncio.to_thread(
self._generate_post_for_slot,
slot,
topic,
request.obiettivo_campagna,
brand_name,
tono,
system_prompt,
)
post_result = PostResult(
slot_index=slot.indice,
status="success",
post=post,
)
logger.info(
"Slot completato | job_id=%s | slot=%d | status=success",
job_id,
slot.indice,
)
except Exception as e:
# Fallimento isolato — logga e continua con il prossimo slot
logger.error(
"Errore slot | job_id=%s | slot=%d | errore=%s",
job_id,
slot.indice,
str(e),
)
post_result = PostResult(
slot_index=slot.indice,
status="failed",
error=str(e),
)
# Aggiorna progresso (sia per success che per failed)
job.results.append(post_result)
job.completed += 1
# Risolvi keyword immagine via Unsplash (se API key configurata)
image_url_map: Optional[dict[str, str]] = None
try:
image_url_map = await self._resolve_unsplash_keywords(job)
job.image_url_map = image_url_map
except Exception as e:
logger.warning(
"Unsplash resolution fallita | job_id=%s | errore=%s — continuo con keyword testuali",
job_id,
str(e),
)
# Genera CSV con i risultati
success_results = [r for r in job.results if r.status == "success"]
if success_results:
self._outputs_path.mkdir(parents=True, exist_ok=True)
self._csv.build_csv(
posts=job.results,
calendar=calendar,
job_id=job_id,
output_dir=self._outputs_path,
image_url_map=image_url_map,
)
logger.info(
"CSV generato | job_id=%s | success=%d/%d | url_unsplash=%d",
job_id,
len(success_results),
job.total,
len(image_url_map) if image_url_map else 0,
)
# Salva metadata job su disco per persistenza
self._save_job_to_disk(job)
# Aggiorna stato finale
job.status = "completed"
logger.info(
"Job completato | job_id=%s | success=%d | failed=%d",
job_id,
sum(1 for r in job.results if r.status == "success"),
sum(1 for r in job.results if r.status == "failed"),
)
except Exception as e:
# Errore catastrofico (non isolabile a un singolo slot)
job.status = "failed"
job.error = str(e)
logger.error(
"Job fallito | job_id=%s | errore=%s",
job_id,
str(e),
exc_info=True,
)
# ---------------------------------------------------------------------------
# Generazione singolo post
# ---------------------------------------------------------------------------
def _generate_post_for_slot(
self,
slot: CalendarSlot,
topic: str,
obiettivo_campagna: str,
brand_name: str,
tono: str,
system_prompt: str,
) -> GeneratedPost:
"""Genera il contenuto completo di un post dato lo slot e il topic.
Seleziona il prompt template corretto in base al formato_narrativo,
compila il prompt con le variabili, chiama l'LLM.
Args:
slot: Slot con metadati strategici.
topic: Topic specifico del post (già generato).
obiettivo_campagna: Obiettivo della campagna.
brand_name: Nome del brand.
tono: Tono di voce.
system_prompt: System prompt caricato.
Returns:
GeneratedPost validato con Pydantic.
"""
prompt_name = self._select_prompt_template(
slot.formato_narrativo, slot.tipo_contenuto
)
# Verifica che il prompt template esista
if not self._prompts.prompt_exists(prompt_name):
logger.warning(
"Prompt '%s' non trovato, uso fallback '%s'",
prompt_name,
_DEFAULT_PROMPT,
)
prompt_name = _DEFAULT_PROMPT
# Prepara le variabili comuni a tutti i prompt
variables: dict[str, str] = {
"obiettivo_campagna": obiettivo_campagna,
"topic": topic,
"target_nicchia": slot.target_nicchia,
"livello_schwartz": slot.livello_schwartz,
"brand_name": brand_name,
}
# Aggiungi variabili specifiche per alcuni prompt
# aida_promozione ha {{call_to_action}} come variabile aggiuntiva
if "call_to_action" in self._prompts.get_required_variables(prompt_name):
variables["call_to_action"] = f"Scopri di più su {brand_name}"
user_prompt = self._prompts.compile_prompt(prompt_name, variables)
post = self._llm.generate(
system_prompt=system_prompt,
user_prompt=user_prompt,
response_schema=GeneratedPost,
)
return post
def _select_prompt_template(self, formato: str, tipo: str) -> str:
"""Mappa formato_narrativo + tipo_contenuto al nome del file prompt.
Logica di selezione:
1. Cerca corrispondenza esatta in _FORMAT_TO_PROMPT
2. Se non trovata, usa il fallback globale _DEFAULT_PROMPT
Args:
formato: Formato narrativo (es. "PAS", "AIDA", "BAB").
tipo: Tipo di contenuto (es. "valore", "promozione").
Returns:
Nome del file prompt senza estensione (es. "pas_valore").
"""
# Prova prima la combinazione formato + tipo (per future specializzazioni)
combined_key = f"{formato.lower()}_{tipo.lower()}"
if combined_key in {v for v in _FORMAT_TO_PROMPT.values()}:
return combined_key
# Altrimenti usa la mappa formato -> prompt
return _FORMAT_TO_PROMPT.get(formato, _DEFAULT_PROMPT)
# ---------------------------------------------------------------------------
# Integrazione Unsplash
# ---------------------------------------------------------------------------
async def _resolve_unsplash_keywords(
self,
job: JobStatus,
) -> Optional[dict[str, str]]:
"""Risolve le keyword immagine dei post in URL Unsplash.
Carica la settings da disco per verificare se unsplash_api_key e' configurata.
Se non e' configurata, ritorna None (nessuna risoluzione, usa keyword testuali).
Estrae tutte le keyword uniche dai PostResult success:
- cover_image_keyword
- slide.image_keyword per ogni slide
- cta_image_keyword
Args:
job: JobStatus con i risultati generati.
Returns:
Dizionario {keyword: url} per le keyword risolte, o None se Unsplash non configurato.
"""
import json as _json
# Carica settings per controllare unsplash_api_key
settings_path = DATA_PATH / "config" / "settings.json"
unsplash_api_key: Optional[str] = None
if settings_path.exists():
try:
data = _json.loads(settings_path.read_text(encoding="utf-8"))
unsplash_api_key = data.get("unsplash_api_key")
except Exception as e:
logger.warning("Errore lettura settings per Unsplash: %s", str(e))
if not unsplash_api_key:
logger.debug("unsplash_api_key non configurata — skip risoluzione keyword")
return None
# Estrai tutte le keyword uniche dai post success
keywords: list[str] = []
for post_result in job.results:
if post_result.status != "success" or post_result.post is None:
continue
post = post_result.post
if post.cover_image_keyword:
keywords.append(post.cover_image_keyword)
for slide in post.slides:
if slide.image_keyword:
keywords.append(slide.image_keyword)
if post.cta_image_keyword:
keywords.append(post.cta_image_keyword)
if not keywords:
logger.debug("Nessuna keyword immagine trovata nei post")
return None
logger.info(
"Avvio risoluzione Unsplash | job_id=%s | keyword_totali=%d",
job.job_id,
len(keywords),
)
# Crea e usa UnsplashService
unsplash_cache_path = DATA_PATH / "unsplash_cache.json"
unsplash = UnsplashService(
api_key=unsplash_api_key,
cache_path=unsplash_cache_path,
)
try:
image_url_map = await unsplash.resolve_keywords(keywords)
logger.info(
"Risoluzione Unsplash completata | job_id=%s | risolte=%d/%d",
job.job_id,
len(image_url_map),
len(set(keywords)),
)
return image_url_map if image_url_map else None
finally:
await unsplash.close()
# ---------------------------------------------------------------------------
# Persistenza su disco
# ---------------------------------------------------------------------------
def _save_job_to_disk(self, job: JobStatus) -> None:
"""Salva i metadata del job in JSON per ricaricamento dopo restart.
Args:
job: JobStatus da persistere.
"""
self._outputs_path.mkdir(parents=True, exist_ok=True)
job_path = self._outputs_path / f"{job.job_id}.json"
# Serializza con Pydantic per gestire Optional e Literal
data = {
"job_id": job.job_id,
"status": job.status,
"total": job.total,
"completed": job.completed,
"current_post": job.current_post,
"campagna": job.campagna,
"error": job.error,
"results": [r.model_dump() for r in job.results],
"calendar": job.calendar.model_dump() if job.calendar else None,
"image_url_map": job.image_url_map,
}
with open(job_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.debug("Job salvato su disco | job_id=%s | path=%s", job.job_id, job_path)
def _load_job_from_disk(self, job_id: str) -> Optional[JobStatus]:
"""Carica il job dal file JSON su disco.
Args:
job_id: Identificatore del job.
Returns:
JobStatus ricostruito, o None se il file non esiste.
"""
from backend.schemas.calendar import CalendarResponse
job_path = self._outputs_path / f"{job_id}.json"
if not job_path.exists():
return None
try:
with open(job_path, "r", encoding="utf-8") as f:
data = json.load(f)
results = [PostResult.model_validate(r) for r in data.get("results", [])]
calendar = (
CalendarResponse.model_validate(data["calendar"])
if data.get("calendar")
else None
)
job_status = JobStatus(
job_id=data["job_id"],
status=data["status"],
total=data["total"],
completed=data["completed"],
current_post=data.get("current_post", 0),
results=results,
calendar=calendar,
error=data.get("error"),
campagna=data.get("campagna", ""),
image_url_map=data.get("image_url_map"),
)
# Metti in memoria per accesso futuro
self._jobs[job_id] = job_status
return job_status
except Exception as e:
logger.error(
"Errore caricamento job da disco | job_id=%s | errore=%s",
job_id,
str(e),
)
return None