Files
postgenerator/backend/services/llm_service.py
Michele 5870b5eede fix: strip markdown code fences from LLM JSON responses
Claude wraps JSON in ```json ... ``` fences even when instructed to
return raw JSON. This caused all TopicResult validations to fail with
"Invalid JSON at line 1 column 1". Strip fences before parsing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 15:10:39 +01:00

301 lines
11 KiB
Python

"""LLMService — chiama Claude API con retry, backoff, rate limit e validazione Pydantic.
Gestisce:
- Retry con backoff esponenziale per errori 5xx
- RateLimitError (429): legge l'header retry-after e attende il tempo esatto
- ValidationError Pydantic: riprova una volta con istruzione correttiva
- Delay inter-request configurabile per rispettare OTPM Tier 1
"""
from __future__ import annotations
import json
import logging
import random
import re
import time
from typing import Type, TypeVar
import anthropic
from pydantic import BaseModel, ValidationError
from backend.schemas.generate import TopicResult
from backend.services.prompt_service import PromptService
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=BaseModel)
class LLMService:
"""Servizio per chiamare Claude API con retry, backoff e validazione schema."""
def __init__(
self,
api_key: str,
model: str = "claude-sonnet-4-5",
max_retries: int = 3,
inter_request_delay: float = 2.0,
) -> None:
"""Inizializza il servizio con il client Anthropic.
Args:
api_key: Chiave API Anthropic.
model: Modello Claude da usare (default: claude-sonnet-4-5).
max_retries: Numero massimo di tentativi per chiamata (default: 3).
inter_request_delay: Secondi di pausa tra chiamate riuscite per
rispettare il rate limit OTPM Tier 1 (default: 2.0).
"""
self._client = anthropic.Anthropic(api_key=api_key)
self._model = model
self._max_retries = max_retries
self._inter_request_delay = inter_request_delay
def generate(
self,
system_prompt: str,
user_prompt: str,
response_schema: Type[T],
) -> T:
"""Chiama Claude e valida la risposta con uno schema Pydantic.
Loop retry con gestione errori specifica per tipo:
- 429 RateLimitError: attende il tempo dall'header retry-after
- 5xx APIStatusError: backoff esponenziale con jitter
- ValidationError: riprova una volta con istruzione correttiva
- Altre eccezioni: solleva immediatamente (no retry)
Dopo ogni chiamata riuscita, applica inter_request_delay.
Args:
system_prompt: Istruzione di sistema per Claude.
user_prompt: Prompt utente con i dettagli del task.
response_schema: Classe Pydantic per validare e parsare la risposta JSON.
Returns:
Istanza del modello Pydantic con i dati validati.
Raises:
anthropic.APIError: Se tutti i tentativi falliscono.
ValueError: Se la risposta non è JSON valido dopo tutti i tentativi.
"""
last_error: Exception | None = None
current_user_prompt = user_prompt
validation_retry_done = False
for attempt in range(self._max_retries):
try:
logger.info(
"Chiamata LLM | modello=%s | tentativo=%d/%d | schema=%s",
self._model,
attempt + 1,
self._max_retries,
response_schema.__name__,
)
t_start = time.perf_counter()
response = self._client.messages.create(
model=self._model,
max_tokens=4096,
system=system_prompt,
messages=[{"role": "user", "content": current_user_prompt}],
)
elapsed = time.perf_counter() - t_start
raw_text = response.content[0].text
logger.info(
"Risposta LLM | tokens_in=%d | tokens_out=%d | elapsed=%.2fs",
response.usage.input_tokens,
response.usage.output_tokens,
elapsed,
)
# Rimuovi eventuali code fences markdown e valida con Pydantic
clean_text = self._strip_code_fences(raw_text)
try:
result = response_schema.model_validate_json(clean_text)
# Pausa inter-request dopo chiamata riuscita
time.sleep(self._inter_request_delay)
return result
except ValidationError as ve:
logger.warning(
"ValidationError | schema=%s | tentativo=%d | errori=%s",
response_schema.__name__,
attempt + 1,
ve.errors(),
)
if not validation_retry_done:
# Aggiunge istruzione correttiva al prompt e riprova
validation_retry_done = True
current_user_prompt = (
user_prompt
+ "\n\nIl tuo output precedente non era JSON valido. "
"Rispondi SOLO con JSON valido secondo lo schema. "
"Non aggiungere testo prima o dopo il JSON."
)
last_error = ve
continue
else:
raise ValueError(
f"Risposta LLM non valida dopo {attempt + 1} tentativi. "
f"Schema: {response_schema.__name__}. "
f"Ultimo errore: {ve}"
) from ve
except json.JSONDecodeError as je:
logger.warning(
"JSONDecodeError | tentativo=%d | errore=%s",
attempt + 1,
je,
)
if not validation_retry_done:
validation_retry_done = True
current_user_prompt = (
user_prompt
+ "\n\nIl tuo output precedente non era JSON valido. "
"Rispondi SOLO con JSON valido secondo lo schema. "
"Non aggiungere testo prima o dopo il JSON."
)
last_error = je
continue
else:
raise ValueError(
f"JSON non valido dopo {attempt + 1} tentativi."
) from je
except anthropic.RateLimitError as e:
# 429 — leggi retry-after header e attendi esattamente quel tempo
retry_after = self._parse_retry_after(e)
logger.warning(
"RateLimitError (429) | retry_after=%.1fs | tentativo=%d/%d",
retry_after,
attempt + 1,
self._max_retries,
)
if attempt < self._max_retries - 1:
time.sleep(retry_after)
last_error = e
continue
raise
except anthropic.APIStatusError as e:
if e.status_code >= 500:
# 5xx — backoff esponenziale con jitter
backoff = (2 ** attempt) + random.uniform(0, 1)
logger.warning(
"APIStatusError %d | backoff=%.2fs | tentativo=%d/%d",
e.status_code,
backoff,
attempt + 1,
self._max_retries,
)
if attempt < self._max_retries - 1:
time.sleep(backoff)
last_error = e
continue
raise
else:
# 4xx (non 429) — non ritentare
raise
except Exception:
# Qualsiasi altra eccezione non prevista: non ritentare
raise
# Tutti i tentativi esauriti
if last_error:
raise RuntimeError(
f"LLM fallito dopo {self._max_retries} tentativi. "
f"Ultimo errore: {last_error}"
) from last_error
raise RuntimeError(f"LLM fallito dopo {self._max_retries} tentativi.")
def generate_topic(
self,
system_prompt: str,
obiettivo_campagna: str,
tipo_contenuto: str,
target_nicchia: str,
fase_campagna: str,
livello_schwartz: str,
prompt_service: "PromptService",
) -> str:
"""Genera un topic per uno slot del calendario.
Usa lo stesso loop retry/validation di generate(), passando TopicResult
come response_schema per garantire validazione JSON coerente.
Args:
system_prompt: Istruzione di sistema (system_prompt.txt).
obiettivo_campagna: Obiettivo principale della campagna.
tipo_contenuto: Tipo di contenuto dello slot (es. "valore").
target_nicchia: Nicchia target dello slot (es. "dentisti").
fase_campagna: Fase del funnel (es. "Cattura").
livello_schwartz: Livello consapevolezza (es. "L3").
prompt_service: Istanza PromptService per compilare il template.
Returns:
Stringa topic generata e validata (estratta da TopicResult.topic).
"""
user_prompt = prompt_service.compile_prompt(
"topic_generator",
{
"obiettivo_campagna": obiettivo_campagna,
"tipo_contenuto": tipo_contenuto,
"target_nicchia": target_nicchia,
"fase_campagna": fase_campagna,
"livello_schwartz": livello_schwartz,
},
)
result: TopicResult = self.generate(
system_prompt=system_prompt,
user_prompt=user_prompt,
response_schema=TopicResult,
)
return result.topic
# ---------------------------------------------------------------------------
# Metodi privati
# ---------------------------------------------------------------------------
@staticmethod
def _strip_code_fences(text: str) -> str:
"""Rimuove i code fences markdown dalla risposta LLM.
Claude a volte wrappa il JSON in ```json ... ``` anche quando
gli si chiede di rispondere solo con JSON.
"""
stripped = text.strip()
# Rimuove ```json ... ``` o ``` ... ```
match = re.match(r"^```(?:json)?\s*\n?(.*?)\n?\s*```$", stripped, re.DOTALL)
if match:
return match.group(1).strip()
return stripped
@staticmethod
def _parse_retry_after(error: anthropic.RateLimitError) -> float:
"""Estrae il valore retry-after dall'eccezione RateLimitError.
Prova a leggere l'header 'retry-after' dalla risposta HTTP.
Se non disponibile, usa il fallback di 60 secondi.
Args:
error: Eccezione RateLimitError da Anthropic.
Returns:
Secondi da attendere prima di riprovare.
"""
default_wait = 60.0
try:
# L'oggetto error ha una proprietà 'response' con gli header HTTP
if hasattr(error, "response") and error.response is not None:
retry_after = error.response.headers.get("retry-after")
if retry_after:
return float(retry_after)
except (AttributeError, ValueError, TypeError):
pass
return default_wait