feat(01-03): LLMService, CSVBuilder, GenerationPipeline

- LLMService: retry 3x, RateLimitError legge retry-after header, 5xx backoff esponenziale, ValidationError riprova con istruzione correttiva, inter_request_delay 2s
- LLMService.generate_topic(): usa TopicResult come response_schema (passa per loop retry/validation)
- CSVBuilder: encoding utf-8-sig, header CANVA_FIELDS locked, mappa GeneratedPost+CalendarSlot -> 33 colonne
- GenerationPipeline: background task asyncio.create_task, _jobs dict progresso real-time, per-item try/except individuale, persistenza JSON su disco
This commit is contained in:
Michele
2026-03-08 02:10:28 +01:00
parent f9c452586f
commit 083621afd3
3 changed files with 1036 additions and 0 deletions

View File

@@ -0,0 +1,181 @@
"""CSVBuilder — produce CSV compatibile con Canva Bulk Create.
Caratteristiche:
- Encoding utf-8-sig (BOM) — critico per Excel + caratteri italiani (Pitfall 3)
- Header CANVA_FIELDS locked — 33 colonne esatte
- Mappa GeneratedPost + CalendarSlot -> riga CSV
- Filtra solo PostResult con status="success"
- Scrive su disco in OUTPUTS_PATH/{job_id}.csv
"""
from __future__ import annotations
import csv
import io
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from backend.constants import CANVA_FIELDS
from backend.schemas.generate import PostResult
if TYPE_CHECKING:
from backend.schemas.calendar import CalendarResponse
logger = logging.getLogger(__name__)
class CSVBuilder:
"""Costruisce file CSV Canva-compatibili dai risultati di generazione."""
def build_csv(
self,
posts: list[PostResult],
calendar: "CalendarResponse",
job_id: str,
output_dir: Path,
) -> Path:
"""Genera e scrive il CSV su disco.
Filtra solo i PostResult con status="success", mappa i dati
GeneratedPost + CalendarSlot alle 33 colonne CANVA_FIELDS,
e scrive con encoding utf-8-sig per compatibilità Excel.
Args:
posts: Lista di PostResult (include success e failed).
calendar: CalendarResponse con i metadati degli slot.
job_id: Identificatore univoco del job (usato come nome file).
output_dir: Directory dove scrivere il file CSV.
Returns:
Path del file CSV scritto su disco.
"""
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{job_id}.csv"
rows = self._build_rows(posts, calendar)
# CRITICO: encoding utf-8-sig (BOM) per Excel + caratteri italiani
with open(output_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=CANVA_FIELDS, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
logger.info(
"CSV scritto | job_id=%s | righe_success=%d | path=%s",
job_id,
len(rows),
output_path,
)
return output_path
def build_csv_content(
self,
posts: list[PostResult],
calendar: "CalendarResponse",
job_id: str,
) -> str:
"""Genera il CSV come stringa (senza scrivere su disco).
Usato per preview e per la route POST /export/{job_id}/csv
con dati modificati inline dall'utente.
Args:
posts: Lista di PostResult (include success e failed).
calendar: CalendarResponse con i metadati degli slot.
job_id: Identificatore univoco del job.
Returns:
Stringa CSV con encoding utf-8-sig (BOM).
"""
rows = self._build_rows(posts, calendar)
output = io.StringIO()
# Aggiungi BOM manualmente per compatibilità Excel
output.write("\ufeff")
writer = csv.DictWriter(output, fieldnames=CANVA_FIELDS, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
return output.getvalue()
# ---------------------------------------------------------------------------
# Metodi privati
# ---------------------------------------------------------------------------
def _build_rows(
self,
posts: list[PostResult],
calendar: "CalendarResponse",
) -> list[dict[str, str]]:
"""Costruisce la lista di righe CSV dai risultati.
Filtra solo i post con status="success" e mappa i dati
GeneratedPost + CalendarSlot alle colonne CANVA_FIELDS.
Args:
posts: Lista completa di PostResult.
calendar: CalendarResponse con i metadati degli slot.
Returns:
Lista di dict con chiavi = CANVA_FIELDS.
"""
# Crea un dizionario slot_index -> CalendarSlot per lookup veloce
slot_map = {slot.indice: slot for slot in calendar.slots}
rows: list[dict[str, str]] = []
for post_result in posts:
if post_result.status != "success" or post_result.post is None:
continue
slot_index = post_result.slot_index
slot = slot_map.get(slot_index)
if slot is None:
logger.warning(
"Slot non trovato per slot_index=%d, skip", slot_index
)
continue
post = post_result.post
row: dict[str, str] = {}
# --- Metadati slot (8 colonne) ---
row["campagna"] = calendar.campagna
row["fase_campagna"] = slot.fase_campagna
row["tipo_contenuto"] = slot.tipo_contenuto
row["formato_narrativo"] = slot.formato_narrativo
row["funzione"] = slot.funzione
row["livello_schwartz"] = slot.livello_schwartz
row["target_nicchia"] = slot.target_nicchia
row["data_pub_suggerita"] = slot.data_pub_suggerita
# --- Cover slide (3 colonne) ---
row["cover_title"] = post.cover_title
row["cover_subtitle"] = post.cover_subtitle
row["cover_image_keyword"] = post.cover_image_keyword
# --- Slide centrali s2-s7 (6 slide x 3 colonne = 18 colonne) ---
slide_labels = ["s2", "s3", "s4", "s5", "s6", "s7"]
for idx, label in enumerate(slide_labels):
if idx < len(post.slides):
slide = post.slides[idx]
row[f"{label}_headline"] = slide.headline
row[f"{label}_body"] = slide.body
row[f"{label}_image_keyword"] = slide.image_keyword
else:
# Fallback se slides ha meno di 6 elementi (non dovrebbe accadere)
row[f"{label}_headline"] = ""
row[f"{label}_body"] = ""
row[f"{label}_image_keyword"] = ""
# --- CTA slide (3 colonne) ---
row["cta_text"] = post.cta_text
row["cta_subtext"] = post.cta_subtext
row["cta_image_keyword"] = post.cta_image_keyword
# --- Caption Instagram (1 colonna) ---
row["caption_instagram"] = post.caption_instagram
rows.append(row)
return rows

View File

@@ -0,0 +1,571 @@
"""GenerationPipeline — orchestra calendario -> LLM -> CSV con per-item error isolation.
Gestisce:
- Generazione async in background (asyncio.create_task)
- Progresso real-time tracciato in _jobs dict (completed/total/current_post)
- Per-item error isolation: un fallimento NON blocca il batch (Pitfall 5)
- Persistenza su disco per ricaricamento dopo restart
- Job status per polling dal frontend
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal, Optional
from backend.schemas.calendar import CalendarRequest, CalendarResponse, CalendarSlot
from backend.schemas.generate import (
GenerateResponse,
GeneratedPost,
PostResult,
TopicResult,
)
from backend.services.calendar_service import CalendarService
from backend.services.csv_builder import CSVBuilder
from backend.services.format_selector import FormatSelector
from backend.services.llm_service import LLMService
from backend.services.prompt_service import PromptService
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Mappa formato_narrativo -> nome file prompt
# ---------------------------------------------------------------------------
_FORMAT_TO_PROMPT: dict[str, str] = {
"PAS": "pas_valore",
"AIDA": "aida_promozione",
"BAB": "bab_storytelling",
"Listicle": "listicle_valore",
"Storytelling": "bab_storytelling", # fallback su BAB per storytelling
"Dato_Implicazione": "dato_news",
"Obiezione_Risposta": "pas_valore", # fallback su PAS per obiezione/risposta
}
_DEFAULT_PROMPT = "pas_valore"
# ---------------------------------------------------------------------------
# Dataclass per tracciare lo stato di un job
# ---------------------------------------------------------------------------
@dataclass
class JobStatus:
"""Stato real-time di un job di generazione batch."""
job_id: str
status: Literal["running", "completed", "failed"]
total: int
completed: int
current_post: int
results: list[PostResult] = field(default_factory=list)
calendar: Optional[CalendarResponse] = None
error: Optional[str] = None
campagna: str = ""
# ---------------------------------------------------------------------------
# Pipeline principale
# ---------------------------------------------------------------------------
class GenerationPipeline:
"""Orchestra il flusso completo: calendario -> topic -> LLM -> CSV."""
def __init__(
self,
llm_service: LLMService,
prompt_service: PromptService,
calendar_service: CalendarService,
format_selector: FormatSelector,
csv_builder: CSVBuilder,
outputs_path: Path,
) -> None:
"""Inizializza la pipeline con i servizi necessari.
Args:
llm_service: Servizio per chiamate LLM con retry.
prompt_service: Servizio per caricare/compilare prompt .txt.
calendar_service: Servizio per generare il calendario editoriale.
format_selector: Selettore formato narrativo per slot.
csv_builder: Builder per CSV Canva-compatibile.
outputs_path: Directory dove salvare i file CSV e JSON dei job.
"""
self._llm = llm_service
self._prompts = prompt_service
self._calendar = calendar_service
self._formats = format_selector
self._csv = csv_builder
self._outputs_path = outputs_path
# Dizionario in-memory per lo stato real-time dei job
self._jobs: dict[str, JobStatus] = {}
# ---------------------------------------------------------------------------
# API pubblica
# ---------------------------------------------------------------------------
def generate_bulk_async(
self,
request: CalendarRequest,
) -> str:
"""Avvia la generazione batch come background task e ritorna subito.
Questo metodo è SINCRONO dal punto di vista del caller: genera il job_id,
inizializza lo stato, lancia il background task, e ritorna immediatamente.
Il frontend può fare polling su get_job_status() per seguire il progresso.
Args:
request: Parametri del calendario (obiettivo, nicchie, etc.).
Returns:
job_id (UUID stringa) da usare per polling e download CSV.
"""
job_id = str(uuid.uuid4())
# Genera il calendario in modo sincrono (è CPU-bound, veloce)
calendar = self._calendar.generate_calendar(request)
# Inizializza stato job
self._jobs[job_id] = JobStatus(
job_id=job_id,
status="running",
total=len(calendar.slots),
completed=0,
current_post=0,
results=[],
calendar=calendar,
campagna=request.obiettivo_campagna,
)
logger.info(
"Job avviato | job_id=%s | slot_totali=%d | campagna=%s",
job_id,
len(calendar.slots),
request.obiettivo_campagna[:50],
)
# Lancia il background task (non blocca)
asyncio.create_task(
self._run_generation(job_id, calendar, request)
)
return job_id
def get_job_status(self, job_id: str) -> Optional[JobStatus]:
"""Ritorna lo stato corrente del job per polling.
Se il job non è in memoria, prova a caricarlo dal file JSON su disco
(per supportare restart del server).
Args:
job_id: Identificatore del job.
Returns:
JobStatus aggiornato, o None se il job non esiste.
"""
if job_id in self._jobs:
return self._jobs[job_id]
# Prova a caricare da disco
return self._load_job_from_disk(job_id)
def get_job_results(self, job_id: str) -> Optional[GenerateResponse]:
"""Ritorna i risultati completi di un job completato.
Args:
job_id: Identificatore del job.
Returns:
GenerateResponse con tutti i PostResult, o None se non trovato.
"""
status = self.get_job_status(job_id)
if status is None:
return None
success_count = sum(1 for r in status.results if r.status == "success")
failed_count = sum(1 for r in status.results if r.status == "failed")
return GenerateResponse(
campagna=status.campagna,
results=status.results,
total=status.total,
success_count=success_count,
failed_count=failed_count,
)
def generate_single(
self,
slot: CalendarSlot,
obiettivo_campagna: str,
brand_name: Optional[str] = None,
tono: Optional[str] = None,
) -> PostResult:
"""Genera un singolo post per uno slot.
Utile per rigenerare post falliti o per test.
Args:
slot: Slot del calendario con metadati strategici.
obiettivo_campagna: Obiettivo principale della campagna.
brand_name: Nome del brand (opzionale).
tono: Tono di voce (opzionale).
Returns:
PostResult con status="success" o "failed".
"""
system_prompt = self._prompts.load_prompt("system_prompt")
try:
# Genera topic se mancante
topic = slot.topic
if not topic:
topic = self._llm.generate_topic(
system_prompt=system_prompt,
obiettivo_campagna=obiettivo_campagna,
tipo_contenuto=slot.tipo_contenuto,
target_nicchia=slot.target_nicchia,
fase_campagna=slot.fase_campagna,
livello_schwartz=slot.livello_schwartz,
prompt_service=self._prompts,
)
# Genera il post
post = self._generate_post_for_slot(
slot=slot,
topic=topic,
obiettivo_campagna=obiettivo_campagna,
brand_name=brand_name or "il tuo brand",
tono=tono or "diretto e concreto",
system_prompt=system_prompt,
)
return PostResult(
slot_index=slot.indice,
status="success",
post=post,
)
except Exception as e:
logger.error(
"Errore generazione singola | slot_index=%d | errore=%s",
slot.indice,
str(e),
)
return PostResult(
slot_index=slot.indice,
status="failed",
error=str(e),
)
# ---------------------------------------------------------------------------
# Background task
# ---------------------------------------------------------------------------
async def _run_generation(
self,
job_id: str,
calendar: CalendarResponse,
request: CalendarRequest,
) -> None:
"""Task asincrono che genera tutti i post del batch.
Esegue il loop su ogni slot del calendario con per-item error isolation:
ogni slot è in un try/except INDIVIDUALE — un fallimento NON blocca il loop.
Args:
job_id: Identificatore del job.
calendar: Calendario con gli slot da generare.
request: Parametri della richiesta originale.
"""
job = self._jobs[job_id]
system_prompt = self._prompts.load_prompt("system_prompt")
brand_name = "il tuo brand" # Verrà preso da settings in fase successiva
tono = "diretto e concreto"
try:
for slot in calendar.slots:
# Aggiorna progresso real-time
job.current_post = slot.indice
# CRITICO Pitfall 5: try/except INDIVIDUALE per slot
# Un fallimento NON blocca il batch
try:
logger.info(
"Generazione slot | job_id=%s | slot=%d/%d | tipo=%s | nicchia=%s",
job_id,
slot.indice + 1,
job.total,
slot.tipo_contenuto,
slot.target_nicchia,
)
# Genera topic se non presente nello slot
topic = slot.topic
if not topic:
# Usa asyncio.to_thread per non bloccare l'event loop
# (generate_topic usa time.sleep per inter_request_delay)
topic = await asyncio.to_thread(
self._llm.generate_topic,
system_prompt,
request.obiettivo_campagna,
slot.tipo_contenuto,
slot.target_nicchia,
slot.fase_campagna,
slot.livello_schwartz,
self._prompts,
)
# Genera il post completo
post = await asyncio.to_thread(
self._generate_post_for_slot,
slot,
topic,
request.obiettivo_campagna,
brand_name,
tono,
system_prompt,
)
post_result = PostResult(
slot_index=slot.indice,
status="success",
post=post,
)
logger.info(
"Slot completato | job_id=%s | slot=%d | status=success",
job_id,
slot.indice,
)
except Exception as e:
# Fallimento isolato — logga e continua con il prossimo slot
logger.error(
"Errore slot | job_id=%s | slot=%d | errore=%s",
job_id,
slot.indice,
str(e),
)
post_result = PostResult(
slot_index=slot.indice,
status="failed",
error=str(e),
)
# Aggiorna progresso (sia per success che per failed)
job.results.append(post_result)
job.completed += 1
# Genera CSV con i risultati
success_results = [r for r in job.results if r.status == "success"]
if success_results:
self._outputs_path.mkdir(parents=True, exist_ok=True)
self._csv.build_csv(
posts=job.results,
calendar=calendar,
job_id=job_id,
output_dir=self._outputs_path,
)
logger.info(
"CSV generato | job_id=%s | success=%d/%d",
job_id,
len(success_results),
job.total,
)
# Salva metadata job su disco per persistenza
self._save_job_to_disk(job)
# Aggiorna stato finale
job.status = "completed"
logger.info(
"Job completato | job_id=%s | success=%d | failed=%d",
job_id,
sum(1 for r in job.results if r.status == "success"),
sum(1 for r in job.results if r.status == "failed"),
)
except Exception as e:
# Errore catastrofico (non isolabile a un singolo slot)
job.status = "failed"
job.error = str(e)
logger.error(
"Job fallito | job_id=%s | errore=%s",
job_id,
str(e),
exc_info=True,
)
# ---------------------------------------------------------------------------
# Generazione singolo post
# ---------------------------------------------------------------------------
def _generate_post_for_slot(
self,
slot: CalendarSlot,
topic: str,
obiettivo_campagna: str,
brand_name: str,
tono: str,
system_prompt: str,
) -> GeneratedPost:
"""Genera il contenuto completo di un post dato lo slot e il topic.
Seleziona il prompt template corretto in base al formato_narrativo,
compila il prompt con le variabili, chiama l'LLM.
Args:
slot: Slot con metadati strategici.
topic: Topic specifico del post (già generato).
obiettivo_campagna: Obiettivo della campagna.
brand_name: Nome del brand.
tono: Tono di voce.
system_prompt: System prompt caricato.
Returns:
GeneratedPost validato con Pydantic.
"""
prompt_name = self._select_prompt_template(
slot.formato_narrativo, slot.tipo_contenuto
)
# Verifica che il prompt template esista
if not self._prompts.prompt_exists(prompt_name):
logger.warning(
"Prompt '%s' non trovato, uso fallback '%s'",
prompt_name,
_DEFAULT_PROMPT,
)
prompt_name = _DEFAULT_PROMPT
# Prepara le variabili comuni a tutti i prompt
variables: dict[str, str] = {
"obiettivo_campagna": obiettivo_campagna,
"topic": topic,
"target_nicchia": slot.target_nicchia,
"livello_schwartz": slot.livello_schwartz,
"brand_name": brand_name,
}
# Aggiungi variabili specifiche per alcuni prompt
# aida_promozione ha {{call_to_action}} come variabile aggiuntiva
if "call_to_action" in self._prompts.get_required_variables(prompt_name):
variables["call_to_action"] = f"Scopri di più su {brand_name}"
user_prompt = self._prompts.compile_prompt(prompt_name, variables)
post = self._llm.generate(
system_prompt=system_prompt,
user_prompt=user_prompt,
response_schema=GeneratedPost,
)
return post
def _select_prompt_template(self, formato: str, tipo: str) -> str:
"""Mappa formato_narrativo + tipo_contenuto al nome del file prompt.
Logica di selezione:
1. Cerca corrispondenza esatta in _FORMAT_TO_PROMPT
2. Se non trovata, usa il fallback globale _DEFAULT_PROMPT
Args:
formato: Formato narrativo (es. "PAS", "AIDA", "BAB").
tipo: Tipo di contenuto (es. "valore", "promozione").
Returns:
Nome del file prompt senza estensione (es. "pas_valore").
"""
# Prova prima la combinazione formato + tipo (per future specializzazioni)
combined_key = f"{formato.lower()}_{tipo.lower()}"
if combined_key in {v for v in _FORMAT_TO_PROMPT.values()}:
return combined_key
# Altrimenti usa la mappa formato -> prompt
return _FORMAT_TO_PROMPT.get(formato, _DEFAULT_PROMPT)
# ---------------------------------------------------------------------------
# Persistenza su disco
# ---------------------------------------------------------------------------
def _save_job_to_disk(self, job: JobStatus) -> None:
"""Salva i metadata del job in JSON per ricaricamento dopo restart.
Args:
job: JobStatus da persistere.
"""
self._outputs_path.mkdir(parents=True, exist_ok=True)
job_path = self._outputs_path / f"{job.job_id}.json"
# Serializza con Pydantic per gestire Optional e Literal
data = {
"job_id": job.job_id,
"status": job.status,
"total": job.total,
"completed": job.completed,
"current_post": job.current_post,
"campagna": job.campagna,
"error": job.error,
"results": [r.model_dump() for r in job.results],
"calendar": job.calendar.model_dump() if job.calendar else None,
}
with open(job_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.debug("Job salvato su disco | job_id=%s | path=%s", job.job_id, job_path)
def _load_job_from_disk(self, job_id: str) -> Optional[JobStatus]:
"""Carica il job dal file JSON su disco.
Args:
job_id: Identificatore del job.
Returns:
JobStatus ricostruito, o None se il file non esiste.
"""
from backend.schemas.calendar import CalendarResponse
job_path = self._outputs_path / f"{job_id}.json"
if not job_path.exists():
return None
try:
with open(job_path, "r", encoding="utf-8") as f:
data = json.load(f)
results = [PostResult.model_validate(r) for r in data.get("results", [])]
calendar = (
CalendarResponse.model_validate(data["calendar"])
if data.get("calendar")
else None
)
job_status = JobStatus(
job_id=data["job_id"],
status=data["status"],
total=data["total"],
completed=data["completed"],
current_post=data.get("current_post", 0),
results=results,
calendar=calendar,
error=data.get("error"),
campagna=data.get("campagna", ""),
)
# Metti in memoria per accesso futuro
self._jobs[job_id] = job_status
return job_status
except Exception as e:
logger.error(
"Errore caricamento job da disco | job_id=%s | errore=%s",
job_id,
str(e),
)
return None

View File

@@ -0,0 +1,284 @@
"""LLMService — chiama Claude API con retry, backoff, rate limit e validazione Pydantic.
Gestisce:
- Retry con backoff esponenziale per errori 5xx
- RateLimitError (429): legge l'header retry-after e attende il tempo esatto
- ValidationError Pydantic: riprova una volta con istruzione correttiva
- Delay inter-request configurabile per rispettare OTPM Tier 1
"""
from __future__ import annotations
import json
import logging
import random
import time
from typing import Type, TypeVar
import anthropic
from pydantic import BaseModel, ValidationError
from backend.schemas.generate import TopicResult
from backend.services.prompt_service import PromptService
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=BaseModel)
class LLMService:
"""Servizio per chiamare Claude API con retry, backoff e validazione schema."""
def __init__(
self,
api_key: str,
model: str = "claude-sonnet-4-5",
max_retries: int = 3,
inter_request_delay: float = 2.0,
) -> None:
"""Inizializza il servizio con il client Anthropic.
Args:
api_key: Chiave API Anthropic.
model: Modello Claude da usare (default: claude-sonnet-4-5).
max_retries: Numero massimo di tentativi per chiamata (default: 3).
inter_request_delay: Secondi di pausa tra chiamate riuscite per
rispettare il rate limit OTPM Tier 1 (default: 2.0).
"""
self._client = anthropic.Anthropic(api_key=api_key)
self._model = model
self._max_retries = max_retries
self._inter_request_delay = inter_request_delay
def generate(
self,
system_prompt: str,
user_prompt: str,
response_schema: Type[T],
) -> T:
"""Chiama Claude e valida la risposta con uno schema Pydantic.
Loop retry con gestione errori specifica per tipo:
- 429 RateLimitError: attende il tempo dall'header retry-after
- 5xx APIStatusError: backoff esponenziale con jitter
- ValidationError: riprova una volta con istruzione correttiva
- Altre eccezioni: solleva immediatamente (no retry)
Dopo ogni chiamata riuscita, applica inter_request_delay.
Args:
system_prompt: Istruzione di sistema per Claude.
user_prompt: Prompt utente con i dettagli del task.
response_schema: Classe Pydantic per validare e parsare la risposta JSON.
Returns:
Istanza del modello Pydantic con i dati validati.
Raises:
anthropic.APIError: Se tutti i tentativi falliscono.
ValueError: Se la risposta non è JSON valido dopo tutti i tentativi.
"""
last_error: Exception | None = None
current_user_prompt = user_prompt
validation_retry_done = False
for attempt in range(self._max_retries):
try:
logger.info(
"Chiamata LLM | modello=%s | tentativo=%d/%d | schema=%s",
self._model,
attempt + 1,
self._max_retries,
response_schema.__name__,
)
t_start = time.perf_counter()
response = self._client.messages.create(
model=self._model,
max_tokens=4096,
system=system_prompt,
messages=[{"role": "user", "content": current_user_prompt}],
)
elapsed = time.perf_counter() - t_start
raw_text = response.content[0].text
logger.info(
"Risposta LLM | tokens_in=%d | tokens_out=%d | elapsed=%.2fs",
response.usage.input_tokens,
response.usage.output_tokens,
elapsed,
)
# Valida con Pydantic
try:
result = response_schema.model_validate_json(raw_text)
# Pausa inter-request dopo chiamata riuscita
time.sleep(self._inter_request_delay)
return result
except ValidationError as ve:
logger.warning(
"ValidationError | schema=%s | tentativo=%d | errori=%s",
response_schema.__name__,
attempt + 1,
ve.errors(),
)
if not validation_retry_done:
# Aggiunge istruzione correttiva al prompt e riprova
validation_retry_done = True
current_user_prompt = (
user_prompt
+ "\n\nIl tuo output precedente non era JSON valido. "
"Rispondi SOLO con JSON valido secondo lo schema. "
"Non aggiungere testo prima o dopo il JSON."
)
last_error = ve
continue
else:
raise ValueError(
f"Risposta LLM non valida dopo {attempt + 1} tentativi. "
f"Schema: {response_schema.__name__}. "
f"Ultimo errore: {ve}"
) from ve
except json.JSONDecodeError as je:
logger.warning(
"JSONDecodeError | tentativo=%d | errore=%s",
attempt + 1,
je,
)
if not validation_retry_done:
validation_retry_done = True
current_user_prompt = (
user_prompt
+ "\n\nIl tuo output precedente non era JSON valido. "
"Rispondi SOLO con JSON valido secondo lo schema. "
"Non aggiungere testo prima o dopo il JSON."
)
last_error = je
continue
else:
raise ValueError(
f"JSON non valido dopo {attempt + 1} tentativi."
) from je
except anthropic.RateLimitError as e:
# 429 — leggi retry-after header e attendi esattamente quel tempo
retry_after = self._parse_retry_after(e)
logger.warning(
"RateLimitError (429) | retry_after=%.1fs | tentativo=%d/%d",
retry_after,
attempt + 1,
self._max_retries,
)
if attempt < self._max_retries - 1:
time.sleep(retry_after)
last_error = e
continue
raise
except anthropic.APIStatusError as e:
if e.status_code >= 500:
# 5xx — backoff esponenziale con jitter
backoff = (2 ** attempt) + random.uniform(0, 1)
logger.warning(
"APIStatusError %d | backoff=%.2fs | tentativo=%d/%d",
e.status_code,
backoff,
attempt + 1,
self._max_retries,
)
if attempt < self._max_retries - 1:
time.sleep(backoff)
last_error = e
continue
raise
else:
# 4xx (non 429) — non ritentare
raise
except Exception:
# Qualsiasi altra eccezione non prevista: non ritentare
raise
# Tutti i tentativi esauriti
if last_error:
raise RuntimeError(
f"LLM fallito dopo {self._max_retries} tentativi. "
f"Ultimo errore: {last_error}"
) from last_error
raise RuntimeError(f"LLM fallito dopo {self._max_retries} tentativi.")
def generate_topic(
self,
system_prompt: str,
obiettivo_campagna: str,
tipo_contenuto: str,
target_nicchia: str,
fase_campagna: str,
livello_schwartz: str,
prompt_service: "PromptService",
) -> str:
"""Genera un topic per uno slot del calendario.
Usa lo stesso loop retry/validation di generate(), passando TopicResult
come response_schema per garantire validazione JSON coerente.
Args:
system_prompt: Istruzione di sistema (system_prompt.txt).
obiettivo_campagna: Obiettivo principale della campagna.
tipo_contenuto: Tipo di contenuto dello slot (es. "valore").
target_nicchia: Nicchia target dello slot (es. "dentisti").
fase_campagna: Fase del funnel (es. "Cattura").
livello_schwartz: Livello consapevolezza (es. "L3").
prompt_service: Istanza PromptService per compilare il template.
Returns:
Stringa topic generata e validata (estratta da TopicResult.topic).
"""
user_prompt = prompt_service.compile_prompt(
"topic_generator",
{
"obiettivo_campagna": obiettivo_campagna,
"tipo_contenuto": tipo_contenuto,
"target_nicchia": target_nicchia,
"fase_campagna": fase_campagna,
"livello_schwartz": livello_schwartz,
},
)
result: TopicResult = self.generate(
system_prompt=system_prompt,
user_prompt=user_prompt,
response_schema=TopicResult,
)
return result.topic
# ---------------------------------------------------------------------------
# Metodi privati
# ---------------------------------------------------------------------------
@staticmethod
def _parse_retry_after(error: anthropic.RateLimitError) -> float:
"""Estrae il valore retry-after dall'eccezione RateLimitError.
Prova a leggere l'header 'retry-after' dalla risposta HTTP.
Se non disponibile, usa il fallback di 60 secondi.
Args:
error: Eccezione RateLimitError da Anthropic.
Returns:
Secondi da attendere prima di riprovare.
"""
default_wait = 60.0
try:
# L'oggetto error ha una proprietà 'response' con gli header HTTP
if hasattr(error, "response") and error.response is not None:
retry_after = error.response.headers.get("retry-after")
if retry_after:
return float(retry_after)
except (AttributeError, ValueError, TypeError):
pass
return default_wait