From 083621afd36a0c68891030b230d094146d96f4da Mon Sep 17 00:00:00 2001 From: Michele Date: Sun, 8 Mar 2026 02:10:28 +0100 Subject: [PATCH] feat(01-03): LLMService, CSVBuilder, GenerationPipeline - LLMService: retry 3x, RateLimitError legge retry-after header, 5xx backoff esponenziale, ValidationError riprova con istruzione correttiva, inter_request_delay 2s - LLMService.generate_topic(): usa TopicResult come response_schema (passa per loop retry/validation) - CSVBuilder: encoding utf-8-sig, header CANVA_FIELDS locked, mappa GeneratedPost+CalendarSlot -> 33 colonne - GenerationPipeline: background task asyncio.create_task, _jobs dict progresso real-time, per-item try/except individuale, persistenza JSON su disco --- backend/services/csv_builder.py | 181 ++++++++ backend/services/generation_pipeline.py | 571 ++++++++++++++++++++++++ backend/services/llm_service.py | 284 ++++++++++++ 3 files changed, 1036 insertions(+) create mode 100644 backend/services/csv_builder.py create mode 100644 backend/services/generation_pipeline.py create mode 100644 backend/services/llm_service.py diff --git a/backend/services/csv_builder.py b/backend/services/csv_builder.py new file mode 100644 index 0000000..ed705be --- /dev/null +++ b/backend/services/csv_builder.py @@ -0,0 +1,181 @@ +"""CSVBuilder — produce CSV compatibile con Canva Bulk Create. + +Caratteristiche: +- Encoding utf-8-sig (BOM) — critico per Excel + caratteri italiani (Pitfall 3) +- Header CANVA_FIELDS locked — 33 colonne esatte +- Mappa GeneratedPost + CalendarSlot -> riga CSV +- Filtra solo PostResult con status="success" +- Scrive su disco in OUTPUTS_PATH/{job_id}.csv +""" + +from __future__ import annotations + +import csv +import io +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +from backend.constants import CANVA_FIELDS +from backend.schemas.generate import PostResult + +if TYPE_CHECKING: + from backend.schemas.calendar import CalendarResponse + + +logger = logging.getLogger(__name__) + + +class CSVBuilder: + """Costruisce file CSV Canva-compatibili dai risultati di generazione.""" + + def build_csv( + self, + posts: list[PostResult], + calendar: "CalendarResponse", + job_id: str, + output_dir: Path, + ) -> Path: + """Genera e scrive il CSV su disco. + + Filtra solo i PostResult con status="success", mappa i dati + GeneratedPost + CalendarSlot alle 33 colonne CANVA_FIELDS, + e scrive con encoding utf-8-sig per compatibilità Excel. + + Args: + posts: Lista di PostResult (include success e failed). + calendar: CalendarResponse con i metadati degli slot. + job_id: Identificatore univoco del job (usato come nome file). + output_dir: Directory dove scrivere il file CSV. + + Returns: + Path del file CSV scritto su disco. + """ + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / f"{job_id}.csv" + + rows = self._build_rows(posts, calendar) + + # CRITICO: encoding utf-8-sig (BOM) per Excel + caratteri italiani + with open(output_path, "w", newline="", encoding="utf-8-sig") as f: + writer = csv.DictWriter(f, fieldnames=CANVA_FIELDS, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + logger.info( + "CSV scritto | job_id=%s | righe_success=%d | path=%s", + job_id, + len(rows), + output_path, + ) + return output_path + + def build_csv_content( + self, + posts: list[PostResult], + calendar: "CalendarResponse", + job_id: str, + ) -> str: + """Genera il CSV come stringa (senza scrivere su disco). + + Usato per preview e per la route POST /export/{job_id}/csv + con dati modificati inline dall'utente. + + Args: + posts: Lista di PostResult (include success e failed). + calendar: CalendarResponse con i metadati degli slot. + job_id: Identificatore univoco del job. + + Returns: + Stringa CSV con encoding utf-8-sig (BOM). + """ + rows = self._build_rows(posts, calendar) + + output = io.StringIO() + # Aggiungi BOM manualmente per compatibilità Excel + output.write("\ufeff") + writer = csv.DictWriter(output, fieldnames=CANVA_FIELDS, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + return output.getvalue() + + # --------------------------------------------------------------------------- + # Metodi privati + # --------------------------------------------------------------------------- + + def _build_rows( + self, + posts: list[PostResult], + calendar: "CalendarResponse", + ) -> list[dict[str, str]]: + """Costruisce la lista di righe CSV dai risultati. + + Filtra solo i post con status="success" e mappa i dati + GeneratedPost + CalendarSlot alle colonne CANVA_FIELDS. + + Args: + posts: Lista completa di PostResult. + calendar: CalendarResponse con i metadati degli slot. + + Returns: + Lista di dict con chiavi = CANVA_FIELDS. + """ + # Crea un dizionario slot_index -> CalendarSlot per lookup veloce + slot_map = {slot.indice: slot for slot in calendar.slots} + + rows: list[dict[str, str]] = [] + for post_result in posts: + if post_result.status != "success" or post_result.post is None: + continue + + slot_index = post_result.slot_index + slot = slot_map.get(slot_index) + if slot is None: + logger.warning( + "Slot non trovato per slot_index=%d, skip", slot_index + ) + continue + + post = post_result.post + row: dict[str, str] = {} + + # --- Metadati slot (8 colonne) --- + row["campagna"] = calendar.campagna + row["fase_campagna"] = slot.fase_campagna + row["tipo_contenuto"] = slot.tipo_contenuto + row["formato_narrativo"] = slot.formato_narrativo + row["funzione"] = slot.funzione + row["livello_schwartz"] = slot.livello_schwartz + row["target_nicchia"] = slot.target_nicchia + row["data_pub_suggerita"] = slot.data_pub_suggerita + + # --- Cover slide (3 colonne) --- + row["cover_title"] = post.cover_title + row["cover_subtitle"] = post.cover_subtitle + row["cover_image_keyword"] = post.cover_image_keyword + + # --- Slide centrali s2-s7 (6 slide x 3 colonne = 18 colonne) --- + slide_labels = ["s2", "s3", "s4", "s5", "s6", "s7"] + for idx, label in enumerate(slide_labels): + if idx < len(post.slides): + slide = post.slides[idx] + row[f"{label}_headline"] = slide.headline + row[f"{label}_body"] = slide.body + row[f"{label}_image_keyword"] = slide.image_keyword + else: + # Fallback se slides ha meno di 6 elementi (non dovrebbe accadere) + row[f"{label}_headline"] = "" + row[f"{label}_body"] = "" + row[f"{label}_image_keyword"] = "" + + # --- CTA slide (3 colonne) --- + row["cta_text"] = post.cta_text + row["cta_subtext"] = post.cta_subtext + row["cta_image_keyword"] = post.cta_image_keyword + + # --- Caption Instagram (1 colonna) --- + row["caption_instagram"] = post.caption_instagram + + rows.append(row) + + return rows diff --git a/backend/services/generation_pipeline.py b/backend/services/generation_pipeline.py new file mode 100644 index 0000000..9cbae3e --- /dev/null +++ b/backend/services/generation_pipeline.py @@ -0,0 +1,571 @@ +"""GenerationPipeline — orchestra calendario -> LLM -> CSV con per-item error isolation. + +Gestisce: +- Generazione async in background (asyncio.create_task) +- Progresso real-time tracciato in _jobs dict (completed/total/current_post) +- Per-item error isolation: un fallimento NON blocca il batch (Pitfall 5) +- Persistenza su disco per ricaricamento dopo restart +- Job status per polling dal frontend +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal, Optional + +from backend.schemas.calendar import CalendarRequest, CalendarResponse, CalendarSlot +from backend.schemas.generate import ( + GenerateResponse, + GeneratedPost, + PostResult, + TopicResult, +) +from backend.services.calendar_service import CalendarService +from backend.services.csv_builder import CSVBuilder +from backend.services.format_selector import FormatSelector +from backend.services.llm_service import LLMService +from backend.services.prompt_service import PromptService + + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Mappa formato_narrativo -> nome file prompt +# --------------------------------------------------------------------------- + +_FORMAT_TO_PROMPT: dict[str, str] = { + "PAS": "pas_valore", + "AIDA": "aida_promozione", + "BAB": "bab_storytelling", + "Listicle": "listicle_valore", + "Storytelling": "bab_storytelling", # fallback su BAB per storytelling + "Dato_Implicazione": "dato_news", + "Obiezione_Risposta": "pas_valore", # fallback su PAS per obiezione/risposta +} + +_DEFAULT_PROMPT = "pas_valore" + + +# --------------------------------------------------------------------------- +# Dataclass per tracciare lo stato di un job +# --------------------------------------------------------------------------- + +@dataclass +class JobStatus: + """Stato real-time di un job di generazione batch.""" + + job_id: str + status: Literal["running", "completed", "failed"] + total: int + completed: int + current_post: int + results: list[PostResult] = field(default_factory=list) + calendar: Optional[CalendarResponse] = None + error: Optional[str] = None + campagna: str = "" + + +# --------------------------------------------------------------------------- +# Pipeline principale +# --------------------------------------------------------------------------- + +class GenerationPipeline: + """Orchestra il flusso completo: calendario -> topic -> LLM -> CSV.""" + + def __init__( + self, + llm_service: LLMService, + prompt_service: PromptService, + calendar_service: CalendarService, + format_selector: FormatSelector, + csv_builder: CSVBuilder, + outputs_path: Path, + ) -> None: + """Inizializza la pipeline con i servizi necessari. + + Args: + llm_service: Servizio per chiamate LLM con retry. + prompt_service: Servizio per caricare/compilare prompt .txt. + calendar_service: Servizio per generare il calendario editoriale. + format_selector: Selettore formato narrativo per slot. + csv_builder: Builder per CSV Canva-compatibile. + outputs_path: Directory dove salvare i file CSV e JSON dei job. + """ + self._llm = llm_service + self._prompts = prompt_service + self._calendar = calendar_service + self._formats = format_selector + self._csv = csv_builder + self._outputs_path = outputs_path + + # Dizionario in-memory per lo stato real-time dei job + self._jobs: dict[str, JobStatus] = {} + + # --------------------------------------------------------------------------- + # API pubblica + # --------------------------------------------------------------------------- + + def generate_bulk_async( + self, + request: CalendarRequest, + ) -> str: + """Avvia la generazione batch come background task e ritorna subito. + + Questo metodo è SINCRONO dal punto di vista del caller: genera il job_id, + inizializza lo stato, lancia il background task, e ritorna immediatamente. + Il frontend può fare polling su get_job_status() per seguire il progresso. + + Args: + request: Parametri del calendario (obiettivo, nicchie, etc.). + + Returns: + job_id (UUID stringa) da usare per polling e download CSV. + """ + job_id = str(uuid.uuid4()) + + # Genera il calendario in modo sincrono (è CPU-bound, veloce) + calendar = self._calendar.generate_calendar(request) + + # Inizializza stato job + self._jobs[job_id] = JobStatus( + job_id=job_id, + status="running", + total=len(calendar.slots), + completed=0, + current_post=0, + results=[], + calendar=calendar, + campagna=request.obiettivo_campagna, + ) + + logger.info( + "Job avviato | job_id=%s | slot_totali=%d | campagna=%s", + job_id, + len(calendar.slots), + request.obiettivo_campagna[:50], + ) + + # Lancia il background task (non blocca) + asyncio.create_task( + self._run_generation(job_id, calendar, request) + ) + + return job_id + + def get_job_status(self, job_id: str) -> Optional[JobStatus]: + """Ritorna lo stato corrente del job per polling. + + Se il job non è in memoria, prova a caricarlo dal file JSON su disco + (per supportare restart del server). + + Args: + job_id: Identificatore del job. + + Returns: + JobStatus aggiornato, o None se il job non esiste. + """ + if job_id in self._jobs: + return self._jobs[job_id] + + # Prova a caricare da disco + return self._load_job_from_disk(job_id) + + def get_job_results(self, job_id: str) -> Optional[GenerateResponse]: + """Ritorna i risultati completi di un job completato. + + Args: + job_id: Identificatore del job. + + Returns: + GenerateResponse con tutti i PostResult, o None se non trovato. + """ + status = self.get_job_status(job_id) + if status is None: + return None + + success_count = sum(1 for r in status.results if r.status == "success") + failed_count = sum(1 for r in status.results if r.status == "failed") + + return GenerateResponse( + campagna=status.campagna, + results=status.results, + total=status.total, + success_count=success_count, + failed_count=failed_count, + ) + + def generate_single( + self, + slot: CalendarSlot, + obiettivo_campagna: str, + brand_name: Optional[str] = None, + tono: Optional[str] = None, + ) -> PostResult: + """Genera un singolo post per uno slot. + + Utile per rigenerare post falliti o per test. + + Args: + slot: Slot del calendario con metadati strategici. + obiettivo_campagna: Obiettivo principale della campagna. + brand_name: Nome del brand (opzionale). + tono: Tono di voce (opzionale). + + Returns: + PostResult con status="success" o "failed". + """ + system_prompt = self._prompts.load_prompt("system_prompt") + + try: + # Genera topic se mancante + topic = slot.topic + if not topic: + topic = self._llm.generate_topic( + system_prompt=system_prompt, + obiettivo_campagna=obiettivo_campagna, + tipo_contenuto=slot.tipo_contenuto, + target_nicchia=slot.target_nicchia, + fase_campagna=slot.fase_campagna, + livello_schwartz=slot.livello_schwartz, + prompt_service=self._prompts, + ) + + # Genera il post + post = self._generate_post_for_slot( + slot=slot, + topic=topic, + obiettivo_campagna=obiettivo_campagna, + brand_name=brand_name or "il tuo brand", + tono=tono or "diretto e concreto", + system_prompt=system_prompt, + ) + + return PostResult( + slot_index=slot.indice, + status="success", + post=post, + ) + + except Exception as e: + logger.error( + "Errore generazione singola | slot_index=%d | errore=%s", + slot.indice, + str(e), + ) + return PostResult( + slot_index=slot.indice, + status="failed", + error=str(e), + ) + + # --------------------------------------------------------------------------- + # Background task + # --------------------------------------------------------------------------- + + async def _run_generation( + self, + job_id: str, + calendar: CalendarResponse, + request: CalendarRequest, + ) -> None: + """Task asincrono che genera tutti i post del batch. + + Esegue il loop su ogni slot del calendario con per-item error isolation: + ogni slot è in un try/except INDIVIDUALE — un fallimento NON blocca il loop. + + Args: + job_id: Identificatore del job. + calendar: Calendario con gli slot da generare. + request: Parametri della richiesta originale. + """ + job = self._jobs[job_id] + system_prompt = self._prompts.load_prompt("system_prompt") + brand_name = "il tuo brand" # Verrà preso da settings in fase successiva + tono = "diretto e concreto" + + try: + for slot in calendar.slots: + # Aggiorna progresso real-time + job.current_post = slot.indice + + # CRITICO Pitfall 5: try/except INDIVIDUALE per slot + # Un fallimento NON blocca il batch + try: + logger.info( + "Generazione slot | job_id=%s | slot=%d/%d | tipo=%s | nicchia=%s", + job_id, + slot.indice + 1, + job.total, + slot.tipo_contenuto, + slot.target_nicchia, + ) + + # Genera topic se non presente nello slot + topic = slot.topic + if not topic: + # Usa asyncio.to_thread per non bloccare l'event loop + # (generate_topic usa time.sleep per inter_request_delay) + topic = await asyncio.to_thread( + self._llm.generate_topic, + system_prompt, + request.obiettivo_campagna, + slot.tipo_contenuto, + slot.target_nicchia, + slot.fase_campagna, + slot.livello_schwartz, + self._prompts, + ) + + # Genera il post completo + post = await asyncio.to_thread( + self._generate_post_for_slot, + slot, + topic, + request.obiettivo_campagna, + brand_name, + tono, + system_prompt, + ) + + post_result = PostResult( + slot_index=slot.indice, + status="success", + post=post, + ) + logger.info( + "Slot completato | job_id=%s | slot=%d | status=success", + job_id, + slot.indice, + ) + + except Exception as e: + # Fallimento isolato — logga e continua con il prossimo slot + logger.error( + "Errore slot | job_id=%s | slot=%d | errore=%s", + job_id, + slot.indice, + str(e), + ) + post_result = PostResult( + slot_index=slot.indice, + status="failed", + error=str(e), + ) + + # Aggiorna progresso (sia per success che per failed) + job.results.append(post_result) + job.completed += 1 + + # Genera CSV con i risultati + success_results = [r for r in job.results if r.status == "success"] + if success_results: + self._outputs_path.mkdir(parents=True, exist_ok=True) + self._csv.build_csv( + posts=job.results, + calendar=calendar, + job_id=job_id, + output_dir=self._outputs_path, + ) + logger.info( + "CSV generato | job_id=%s | success=%d/%d", + job_id, + len(success_results), + job.total, + ) + + # Salva metadata job su disco per persistenza + self._save_job_to_disk(job) + + # Aggiorna stato finale + job.status = "completed" + logger.info( + "Job completato | job_id=%s | success=%d | failed=%d", + job_id, + sum(1 for r in job.results if r.status == "success"), + sum(1 for r in job.results if r.status == "failed"), + ) + + except Exception as e: + # Errore catastrofico (non isolabile a un singolo slot) + job.status = "failed" + job.error = str(e) + logger.error( + "Job fallito | job_id=%s | errore=%s", + job_id, + str(e), + exc_info=True, + ) + + # --------------------------------------------------------------------------- + # Generazione singolo post + # --------------------------------------------------------------------------- + + def _generate_post_for_slot( + self, + slot: CalendarSlot, + topic: str, + obiettivo_campagna: str, + brand_name: str, + tono: str, + system_prompt: str, + ) -> GeneratedPost: + """Genera il contenuto completo di un post dato lo slot e il topic. + + Seleziona il prompt template corretto in base al formato_narrativo, + compila il prompt con le variabili, chiama l'LLM. + + Args: + slot: Slot con metadati strategici. + topic: Topic specifico del post (già generato). + obiettivo_campagna: Obiettivo della campagna. + brand_name: Nome del brand. + tono: Tono di voce. + system_prompt: System prompt caricato. + + Returns: + GeneratedPost validato con Pydantic. + """ + prompt_name = self._select_prompt_template( + slot.formato_narrativo, slot.tipo_contenuto + ) + + # Verifica che il prompt template esista + if not self._prompts.prompt_exists(prompt_name): + logger.warning( + "Prompt '%s' non trovato, uso fallback '%s'", + prompt_name, + _DEFAULT_PROMPT, + ) + prompt_name = _DEFAULT_PROMPT + + # Prepara le variabili comuni a tutti i prompt + variables: dict[str, str] = { + "obiettivo_campagna": obiettivo_campagna, + "topic": topic, + "target_nicchia": slot.target_nicchia, + "livello_schwartz": slot.livello_schwartz, + "brand_name": brand_name, + } + + # Aggiungi variabili specifiche per alcuni prompt + # aida_promozione ha {{call_to_action}} come variabile aggiuntiva + if "call_to_action" in self._prompts.get_required_variables(prompt_name): + variables["call_to_action"] = f"Scopri di più su {brand_name}" + + user_prompt = self._prompts.compile_prompt(prompt_name, variables) + + post = self._llm.generate( + system_prompt=system_prompt, + user_prompt=user_prompt, + response_schema=GeneratedPost, + ) + return post + + def _select_prompt_template(self, formato: str, tipo: str) -> str: + """Mappa formato_narrativo + tipo_contenuto al nome del file prompt. + + Logica di selezione: + 1. Cerca corrispondenza esatta in _FORMAT_TO_PROMPT + 2. Se non trovata, usa il fallback globale _DEFAULT_PROMPT + + Args: + formato: Formato narrativo (es. "PAS", "AIDA", "BAB"). + tipo: Tipo di contenuto (es. "valore", "promozione"). + + Returns: + Nome del file prompt senza estensione (es. "pas_valore"). + """ + # Prova prima la combinazione formato + tipo (per future specializzazioni) + combined_key = f"{formato.lower()}_{tipo.lower()}" + if combined_key in {v for v in _FORMAT_TO_PROMPT.values()}: + return combined_key + + # Altrimenti usa la mappa formato -> prompt + return _FORMAT_TO_PROMPT.get(formato, _DEFAULT_PROMPT) + + # --------------------------------------------------------------------------- + # Persistenza su disco + # --------------------------------------------------------------------------- + + def _save_job_to_disk(self, job: JobStatus) -> None: + """Salva i metadata del job in JSON per ricaricamento dopo restart. + + Args: + job: JobStatus da persistere. + """ + self._outputs_path.mkdir(parents=True, exist_ok=True) + job_path = self._outputs_path / f"{job.job_id}.json" + + # Serializza con Pydantic per gestire Optional e Literal + data = { + "job_id": job.job_id, + "status": job.status, + "total": job.total, + "completed": job.completed, + "current_post": job.current_post, + "campagna": job.campagna, + "error": job.error, + "results": [r.model_dump() for r in job.results], + "calendar": job.calendar.model_dump() if job.calendar else None, + } + + with open(job_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + logger.debug("Job salvato su disco | job_id=%s | path=%s", job.job_id, job_path) + + def _load_job_from_disk(self, job_id: str) -> Optional[JobStatus]: + """Carica il job dal file JSON su disco. + + Args: + job_id: Identificatore del job. + + Returns: + JobStatus ricostruito, o None se il file non esiste. + """ + from backend.schemas.calendar import CalendarResponse + + job_path = self._outputs_path / f"{job_id}.json" + if not job_path.exists(): + return None + + try: + with open(job_path, "r", encoding="utf-8") as f: + data = json.load(f) + + results = [PostResult.model_validate(r) for r in data.get("results", [])] + calendar = ( + CalendarResponse.model_validate(data["calendar"]) + if data.get("calendar") + else None + ) + + job_status = JobStatus( + job_id=data["job_id"], + status=data["status"], + total=data["total"], + completed=data["completed"], + current_post=data.get("current_post", 0), + results=results, + calendar=calendar, + error=data.get("error"), + campagna=data.get("campagna", ""), + ) + + # Metti in memoria per accesso futuro + self._jobs[job_id] = job_status + return job_status + + except Exception as e: + logger.error( + "Errore caricamento job da disco | job_id=%s | errore=%s", + job_id, + str(e), + ) + return None diff --git a/backend/services/llm_service.py b/backend/services/llm_service.py new file mode 100644 index 0000000..ac83ab4 --- /dev/null +++ b/backend/services/llm_service.py @@ -0,0 +1,284 @@ +"""LLMService — chiama Claude API con retry, backoff, rate limit e validazione Pydantic. + +Gestisce: +- Retry con backoff esponenziale per errori 5xx +- RateLimitError (429): legge l'header retry-after e attende il tempo esatto +- ValidationError Pydantic: riprova una volta con istruzione correttiva +- Delay inter-request configurabile per rispettare OTPM Tier 1 +""" + +from __future__ import annotations + +import json +import logging +import random +import time +from typing import Type, TypeVar + +import anthropic +from pydantic import BaseModel, ValidationError + +from backend.schemas.generate import TopicResult +from backend.services.prompt_service import PromptService + + +logger = logging.getLogger(__name__) + +T = TypeVar("T", bound=BaseModel) + + +class LLMService: + """Servizio per chiamare Claude API con retry, backoff e validazione schema.""" + + def __init__( + self, + api_key: str, + model: str = "claude-sonnet-4-5", + max_retries: int = 3, + inter_request_delay: float = 2.0, + ) -> None: + """Inizializza il servizio con il client Anthropic. + + Args: + api_key: Chiave API Anthropic. + model: Modello Claude da usare (default: claude-sonnet-4-5). + max_retries: Numero massimo di tentativi per chiamata (default: 3). + inter_request_delay: Secondi di pausa tra chiamate riuscite per + rispettare il rate limit OTPM Tier 1 (default: 2.0). + """ + self._client = anthropic.Anthropic(api_key=api_key) + self._model = model + self._max_retries = max_retries + self._inter_request_delay = inter_request_delay + + def generate( + self, + system_prompt: str, + user_prompt: str, + response_schema: Type[T], + ) -> T: + """Chiama Claude e valida la risposta con uno schema Pydantic. + + Loop retry con gestione errori specifica per tipo: + - 429 RateLimitError: attende il tempo dall'header retry-after + - 5xx APIStatusError: backoff esponenziale con jitter + - ValidationError: riprova una volta con istruzione correttiva + - Altre eccezioni: solleva immediatamente (no retry) + + Dopo ogni chiamata riuscita, applica inter_request_delay. + + Args: + system_prompt: Istruzione di sistema per Claude. + user_prompt: Prompt utente con i dettagli del task. + response_schema: Classe Pydantic per validare e parsare la risposta JSON. + + Returns: + Istanza del modello Pydantic con i dati validati. + + Raises: + anthropic.APIError: Se tutti i tentativi falliscono. + ValueError: Se la risposta non è JSON valido dopo tutti i tentativi. + """ + last_error: Exception | None = None + current_user_prompt = user_prompt + validation_retry_done = False + + for attempt in range(self._max_retries): + try: + logger.info( + "Chiamata LLM | modello=%s | tentativo=%d/%d | schema=%s", + self._model, + attempt + 1, + self._max_retries, + response_schema.__name__, + ) + t_start = time.perf_counter() + + response = self._client.messages.create( + model=self._model, + max_tokens=4096, + system=system_prompt, + messages=[{"role": "user", "content": current_user_prompt}], + ) + + elapsed = time.perf_counter() - t_start + raw_text = response.content[0].text + + logger.info( + "Risposta LLM | tokens_in=%d | tokens_out=%d | elapsed=%.2fs", + response.usage.input_tokens, + response.usage.output_tokens, + elapsed, + ) + + # Valida con Pydantic + try: + result = response_schema.model_validate_json(raw_text) + # Pausa inter-request dopo chiamata riuscita + time.sleep(self._inter_request_delay) + return result + + except ValidationError as ve: + logger.warning( + "ValidationError | schema=%s | tentativo=%d | errori=%s", + response_schema.__name__, + attempt + 1, + ve.errors(), + ) + if not validation_retry_done: + # Aggiunge istruzione correttiva al prompt e riprova + validation_retry_done = True + current_user_prompt = ( + user_prompt + + "\n\nIl tuo output precedente non era JSON valido. " + "Rispondi SOLO con JSON valido secondo lo schema. " + "Non aggiungere testo prima o dopo il JSON." + ) + last_error = ve + continue + else: + raise ValueError( + f"Risposta LLM non valida dopo {attempt + 1} tentativi. " + f"Schema: {response_schema.__name__}. " + f"Ultimo errore: {ve}" + ) from ve + + except json.JSONDecodeError as je: + logger.warning( + "JSONDecodeError | tentativo=%d | errore=%s", + attempt + 1, + je, + ) + if not validation_retry_done: + validation_retry_done = True + current_user_prompt = ( + user_prompt + + "\n\nIl tuo output precedente non era JSON valido. " + "Rispondi SOLO con JSON valido secondo lo schema. " + "Non aggiungere testo prima o dopo il JSON." + ) + last_error = je + continue + else: + raise ValueError( + f"JSON non valido dopo {attempt + 1} tentativi." + ) from je + + except anthropic.RateLimitError as e: + # 429 — leggi retry-after header e attendi esattamente quel tempo + retry_after = self._parse_retry_after(e) + logger.warning( + "RateLimitError (429) | retry_after=%.1fs | tentativo=%d/%d", + retry_after, + attempt + 1, + self._max_retries, + ) + if attempt < self._max_retries - 1: + time.sleep(retry_after) + last_error = e + continue + raise + + except anthropic.APIStatusError as e: + if e.status_code >= 500: + # 5xx — backoff esponenziale con jitter + backoff = (2 ** attempt) + random.uniform(0, 1) + logger.warning( + "APIStatusError %d | backoff=%.2fs | tentativo=%d/%d", + e.status_code, + backoff, + attempt + 1, + self._max_retries, + ) + if attempt < self._max_retries - 1: + time.sleep(backoff) + last_error = e + continue + raise + else: + # 4xx (non 429) — non ritentare + raise + + except Exception: + # Qualsiasi altra eccezione non prevista: non ritentare + raise + + # Tutti i tentativi esauriti + if last_error: + raise RuntimeError( + f"LLM fallito dopo {self._max_retries} tentativi. " + f"Ultimo errore: {last_error}" + ) from last_error + raise RuntimeError(f"LLM fallito dopo {self._max_retries} tentativi.") + + def generate_topic( + self, + system_prompt: str, + obiettivo_campagna: str, + tipo_contenuto: str, + target_nicchia: str, + fase_campagna: str, + livello_schwartz: str, + prompt_service: "PromptService", + ) -> str: + """Genera un topic per uno slot del calendario. + + Usa lo stesso loop retry/validation di generate(), passando TopicResult + come response_schema per garantire validazione JSON coerente. + + Args: + system_prompt: Istruzione di sistema (system_prompt.txt). + obiettivo_campagna: Obiettivo principale della campagna. + tipo_contenuto: Tipo di contenuto dello slot (es. "valore"). + target_nicchia: Nicchia target dello slot (es. "dentisti"). + fase_campagna: Fase del funnel (es. "Cattura"). + livello_schwartz: Livello consapevolezza (es. "L3"). + prompt_service: Istanza PromptService per compilare il template. + + Returns: + Stringa topic generata e validata (estratta da TopicResult.topic). + """ + user_prompt = prompt_service.compile_prompt( + "topic_generator", + { + "obiettivo_campagna": obiettivo_campagna, + "tipo_contenuto": tipo_contenuto, + "target_nicchia": target_nicchia, + "fase_campagna": fase_campagna, + "livello_schwartz": livello_schwartz, + }, + ) + result: TopicResult = self.generate( + system_prompt=system_prompt, + user_prompt=user_prompt, + response_schema=TopicResult, + ) + return result.topic + + # --------------------------------------------------------------------------- + # Metodi privati + # --------------------------------------------------------------------------- + + @staticmethod + def _parse_retry_after(error: anthropic.RateLimitError) -> float: + """Estrae il valore retry-after dall'eccezione RateLimitError. + + Prova a leggere l'header 'retry-after' dalla risposta HTTP. + Se non disponibile, usa il fallback di 60 secondi. + + Args: + error: Eccezione RateLimitError da Anthropic. + + Returns: + Secondi da attendere prima di riprovare. + """ + default_wait = 60.0 + try: + # L'oggetto error ha una proprietà 'response' con gli header HTTP + if hasattr(error, "response") and error.response is not None: + retry_after = error.response.headers.get("retry-after") + if retry_after: + return float(retry_after) + except (AttributeError, ValueError, TypeError): + pass + return default_wait