"""UnsplashService — risolve keyword immagine in URL Unsplash reali. Caratteristiche: - Cerca foto per keyword con orientamento landscape - Cache in-memory + persistenza disco (data/unsplash_cache.json) - Traduzione keyword IT -> EN tramite dizionario statico - Retry automatico su errori di rete (1 tentativo) - Rate limiting awareness tramite header X-Ratelimit-Remaining - Fallback trasparente: keyword non risolte restano keyword testuali """ from __future__ import annotations import json import logging from pathlib import Path from typing import Optional import httpx logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Dizionario di traduzione IT -> EN per keyword B2B comuni # --------------------------------------------------------------------------- _IT_TO_EN: dict[str, str] = { # Ambienti di lavoro "studio": "studio", "ufficio": "office", "scrivania": "desk", "sala riunioni": "meeting room", "riunione": "meeting", # Persone e ruoli "professionista": "professional", "dentista": "dentist", "avvocato": "lawyer", "imprenditore": "entrepreneur", "cliente": "client", "team": "team", "collaborazione": "collaboration", "consulente": "consultant", # Azioni e concetti business "analisi": "analysis", "crescita": "growth", "successo": "success", "strategia": "strategy", "contratto": "contract", "presentazione": "presentation", "azienda": "business", "consulenza": "consulting", "marketing": "marketing", "formazione": "training", "obiettivo": "goal", # Dati e tecnologia "dati": "data", "risultati": "results", "innovazione": "innovation", "tecnologia": "technology", "computer": "computer", "grafici": "charts", # Interazione umana "sorriso": "smile", "stretta di mano": "handshake", # Generico "generico": "business professional", } def _translate_keyword(keyword: str) -> str: """Traduce una keyword italiana in inglese per le query Unsplash. Approccio: 1. Cerca la keyword completa nel dizionario (priorita' massima) 2. Traduce parola per parola e concatena 3. Parole non trovate restano invariate (molte keyword sono gia' in inglese) Args: keyword: Keyword in italiano (o altra lingua) da tradurre. Returns: Keyword tradotta in inglese. """ keyword_lower = keyword.lower().strip() # Prova prima la keyword completa if keyword_lower in _IT_TO_EN: return _IT_TO_EN[keyword_lower] # Traduzione parola per parola words = keyword_lower.split() translated = [] for word in words: translated.append(_IT_TO_EN.get(word, word)) result = " ".join(translated) logger.debug("Traduzione keyword: '%s' -> '%s'", keyword, result) return result # --------------------------------------------------------------------------- # UnsplashService # --------------------------------------------------------------------------- class UnsplashService: """Risolve keyword immagine in URL Unsplash tramite search API. Usa: - Cache in-memory per evitare chiamate duplicate nella stessa sessione - Cache su disco per persistere tra riavvii container - Traduzione IT->EN per massimizzare qualita' risultati - Fallback trasparente su errori o rate limit """ BASE_URL = "https://api.unsplash.com" def __init__(self, api_key: str, cache_path: Path) -> None: """Inizializza il servizio con API key e percorso cache. Args: api_key: Chiave API Unsplash (Client-ID). cache_path: Percorso al file JSON per la cache disco. """ self._api_key = api_key self._cache_path = cache_path self._cache: dict[str, str] = {} self._client = httpx.AsyncClient( base_url=self.BASE_URL, headers={"Authorization": f"Client-ID {api_key}"}, timeout=10.0, ) self._rate_limited = False # Flag per rate limiting del batch corrente # Carica cache da disco se esiste self._load_cache() def _load_cache(self) -> None: """Carica la cache da disco se il file esiste.""" if self._cache_path.exists(): try: data = json.loads(self._cache_path.read_text(encoding="utf-8")) if isinstance(data, dict): self._cache = data logger.info( "Cache Unsplash caricata | entries=%d | path=%s", len(self._cache), self._cache_path, ) except Exception as e: logger.warning("Errore caricamento cache Unsplash: %s", str(e)) self._cache = {} def _save_cache(self) -> None: """Salva la cache su disco.""" try: self._cache_path.parent.mkdir(parents=True, exist_ok=True) self._cache_path.write_text( json.dumps(self._cache, ensure_ascii=False, indent=2), encoding="utf-8", ) logger.debug( "Cache Unsplash salvata | entries=%d | path=%s", len(self._cache), self._cache_path, ) except Exception as e: logger.warning("Errore salvataggio cache Unsplash: %s", str(e)) async def search_photo(self, keyword: str) -> Optional[str]: """Cerca una foto Unsplash per keyword e ritorna l'URL regular (~1080px). Traduce la keyword in inglese prima della ricerca per massimizzare la qualita' dei risultati Unsplash. Args: keyword: Keyword immagine (anche in italiano). Returns: URL dell'immagine (urls.regular ~1080px) o None se non trovata. """ if self._rate_limited: logger.debug("Rate limit attivo, skip ricerca per '%s'", keyword) return None # Traduce keyword per Unsplash query = _translate_keyword(keyword) try: response = await self._client.get( "/search/photos", params={ "query": query, "per_page": 1, "orientation": "landscape", "content_filter": "low", }, ) # Controlla rate limit residuo remaining = int(response.headers.get("X-Ratelimit-Remaining", 100)) if remaining < 5: logger.warning( "Unsplash rate limit quasi esaurito | remaining=%d | stop batch", remaining, ) self._rate_limited = True # Gestisci errori autenticazione (non fare retry) if response.status_code in (401, 403): logger.error( "Unsplash autenticazione fallita | status=%d | api_key_prefix=%s", response.status_code, self._api_key[:8] + "..." if len(self._api_key) > 8 else "...", ) return None response.raise_for_status() data = response.json() results = data.get("results", []) if not results: logger.debug("Nessun risultato Unsplash per '%s' (query='%s')", keyword, query) return None url = results[0].get("urls", {}).get("regular") if url: logger.debug( "Unsplash trovato | keyword='%s' | query='%s' | url=%.50s...", keyword, query, url, ) return url except httpx.HTTPStatusError: # Gia' gestito sopra per 401/403; altri errori HTTP logger.warning("Errore HTTP Unsplash per keyword '%s'", keyword) return None except Exception as e: # Primo retry su errori di rete logger.debug("Primo errore Unsplash per '%s': %s — retry", keyword, str(e)) try: response = await self._client.get( "/search/photos", params={ "query": query, "per_page": 1, "orientation": "landscape", "content_filter": "low", }, ) response.raise_for_status() data = response.json() results = data.get("results", []) if results: return results[0].get("urls", {}).get("regular") return None except Exception as e2: logger.warning( "Errore Unsplash dopo retry | keyword='%s' | errore=%s", keyword, str(e2), ) return None async def resolve_keywords(self, keywords: list[str]) -> dict[str, str]: """Risolve una lista di keyword in URL Unsplash. Usa la cache per evitare chiamate duplicate. Le keyword non risolvibili NON sono nel dizionario ritornato (il caller usa la keyword originale come fallback). Args: keywords: Lista di keyword da risolvere (puo' contenere duplicati). Returns: Dizionario {keyword: url} per le keyword risolte con successo. """ # Deduplicazione unique_keywords = list(dict.fromkeys(keywords)) logger.info( "Risoluzione keyword Unsplash | unique=%d | totali=%d", len(unique_keywords), len(keywords), ) result: dict[str, str] = {} cache_hits = 0 api_calls = 0 new_entries = 0 for keyword in unique_keywords: # Controlla cache in-memory if keyword in self._cache: result[keyword] = self._cache[keyword] cache_hits += 1 logger.debug("Cache hit | keyword='%s'", keyword) continue # Se rate limited, non fare ulteriori chiamate if self._rate_limited: logger.debug("Rate limited, skip '%s'", keyword) continue # Chiama API api_calls += 1 url = await self.search_photo(keyword) if url: self._cache[keyword] = url result[keyword] = url new_entries += 1 logger.info( "Risoluzione completata | cache_hits=%d | api_calls=%d | nuovi=%d | totali_risolti=%d", cache_hits, api_calls, new_entries, len(result), ) # Salva cache su disco se ci sono nuove entries if new_entries > 0: self._save_cache() return result async def close(self) -> None: """Chiude l'httpx.AsyncClient.""" await self._client.aclose() logger.debug("UnsplashService chiuso")