Initial commit: Leopost Full — merge di Leopost, Post Generator e Autopilot OS

- Backend FastAPI con multi-LLM (Claude/OpenAI/Gemini)
- Publishing su Facebook, Instagram, YouTube, TikTok
- Calendario editoriale con awareness levels (PAS, AIDA, BAB...)
- Design system Editorial Fresh (Fraunces + DM Sans)
- Scheduler automatico, gestione commenti AI, affiliate links

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Michele
2026-03-31 17:23:16 +02:00
commit 519a580679
58 changed files with 8348 additions and 0 deletions

View File

View File

@@ -0,0 +1,168 @@
"""CalendarService — genera il calendario editoriale con awareness levels (Schwartz).
Versione adattata per Leopost Full (standalone, senza dipendenze da postgenerator).
Genera un piano di pubblicazione con:
- Formati narrativi: PAS, AIDA, BAB, Storytelling, Listicle, Dato_Implicazione
- Awareness levels (Schwartz): 1-Unaware, 2-Problem Aware, 3-Solution Aware,
4-Product Aware, 5-Most Aware
- Date di pubblicazione suggerite
"""
from __future__ import annotations
from datetime import date, timedelta
from typing import Optional
# ---------------------------------------------------------------------------
# Costanti
# ---------------------------------------------------------------------------
FORMATI_NARRATIVI = [
"PAS",
"AIDA",
"BAB",
"Storytelling",
"Listicle",
"Dato_Implicazione",
]
AWARENESS_LEVELS = {
1: "Unaware",
2: "Problem Aware",
3: "Solution Aware",
4: "Product Aware",
5: "Most Aware",
}
# Mapping formato narrativo -> awareness levels consigliati
_FORMATO_TO_LEVELS: dict[str, list[int]] = {
"PAS": [2, 3],
"AIDA": [3, 4, 5],
"BAB": [2, 3],
"Storytelling": [1, 2],
"Listicle": [2, 3],
"Dato_Implicazione": [1, 2, 3],
}
# Distribuzione default per generazione automatica
_DEFAULT_DISTRIBUTION = [
("Storytelling", 1),
("Dato_Implicazione", 2),
("PAS", 2),
("Listicle", 3),
("AIDA", 4),
("BAB", 3),
("AIDA", 5),
]
# ---------------------------------------------------------------------------
# CalendarService
# ---------------------------------------------------------------------------
class CalendarService:
"""Genera il calendario editoriale con awareness levels e formati narrativi."""
def generate_calendar(
self,
topics: list[str],
num_posts: int = 7,
format_narrativo: Optional[str] = None,
awareness_level: Optional[int] = None,
start_date: Optional[str] = None,
) -> list[dict]:
"""Genera un calendario editoriale."""
if start_date:
try:
data_inizio = date.fromisoformat(start_date)
except ValueError:
data_inizio = date.today()
else:
data_inizio = date.today()
dates = self._generate_dates(data_inizio, num_posts)
if format_narrativo and awareness_level:
distribution = [(format_narrativo, awareness_level)] * num_posts
elif format_narrativo:
levels = _FORMATO_TO_LEVELS.get(format_narrativo, [2, 3, 4])
distribution = [
(format_narrativo, levels[i % len(levels)])
for i in range(num_posts)
]
elif awareness_level:
compatible_formats = [
fmt for fmt, levels in _FORMATO_TO_LEVELS.items()
if awareness_level in levels
]
if not compatible_formats:
compatible_formats = FORMATI_NARRATIVI
distribution = [
(compatible_formats[i % len(compatible_formats)], awareness_level)
for i in range(num_posts)
]
else:
distribution = [
_DEFAULT_DISTRIBUTION[i % len(_DEFAULT_DISTRIBUTION)]
for i in range(num_posts)
]
slots = []
for i in range(num_posts):
topic = topics[i % len(topics)] if topics else f"Topic {i + 1}"
fmt, level = distribution[i]
slots.append({
"indice": i,
"topic": topic,
"formato_narrativo": fmt,
"awareness_level": level,
"awareness_label": AWARENESS_LEVELS.get(level, f"Level {level}"),
"data_pubblicazione": dates[i].isoformat(),
"note": self._generate_note(fmt, level),
})
return slots
def get_formats(self) -> list[dict]:
"""Ritorna la lista dei formati narrativi disponibili."""
return [
{
"value": fmt,
"label": fmt.replace("_", " "),
"awareness_levels": _FORMATO_TO_LEVELS.get(fmt, [2, 3]),
}
for fmt in FORMATI_NARRATIVI
]
@staticmethod
def _generate_dates(start: date, count: int) -> list[date]:
"""Genera date di pubblicazione (lun, mer, ven)."""
publish_days = [0, 2, 4]
dates = []
current = start
while current.weekday() not in publish_days:
current += timedelta(days=1)
while len(dates) < count:
if current.weekday() in publish_days:
dates.append(current)
current += timedelta(days=1)
return dates
@staticmethod
def _generate_note(formato: str, level: int) -> str:
"""Genera una nota descrittiva per lo slot."""
level_label = AWARENESS_LEVELS.get(level, f"L{level}")
notes = {
"PAS": f"Problema-Agitazione-Soluzione. Target: {level_label}",
"AIDA": f"Attenzione-Interesse-Desiderio-Azione. Target: {level_label}",
"BAB": f"Before-After-Bridge. Target: {level_label}",
"Storytelling": f"Racconta una storia autentica. Target: {level_label}",
"Listicle": f"Lista di punti pratici. Target: {level_label}",
"Dato_Implicazione": f"Dato sorprendente + implicazione. Target: {level_label}",
}
return notes.get(formato, f"Formato {formato}. Target: {level_label}")

View File

@@ -0,0 +1,202 @@
"""
Content generation logic for social media posts.
Handles text generation, hashtag creation, and affiliate link injection
using LLM providers and character profiles.
"""
from __future__ import annotations
from .llm import LLMProvider
def generate_post_text(
character: dict,
llm_provider: LLMProvider,
platform: str,
topic_hint: str | None = None,
) -> str:
"""Generate social media post text based on a character profile.
Args:
character: Dict with keys: name, niche, topics (list), tone (str).
topic_hint: Optional topic suggestion to guide generation.
llm_provider: LLM provider instance for text generation.
platform: Target platform (e.g. 'instagram', 'facebook', 'tiktok', 'youtube').
Returns:
Generated post text as a string.
"""
name = character.get("name", "Creator")
niche = character.get("niche", "general")
topics = character.get("topics", [])
tone = character.get("tone", "professional")
topics_str = ", ".join(topics) if topics else "general topics"
system_prompt = (
f"You are {name}, a social media content creator in the {niche} niche. "
f"Your expertise covers: {topics_str}. "
f"Your communication style is {tone}. "
f"You create authentic, engaging content that resonates with your audience. "
f"Never reveal you are an AI. Write as {name} would naturally write."
)
# Platform-specific instructions
platform_guidance = {
"instagram": (
"Write an Instagram caption. Keep it engaging, use line breaks for readability. "
"Aim for 150-300 characters for the main hook, then expand. "
"Do NOT include hashtags (they will be added separately)."
),
"facebook": (
"Write a Facebook post. Can be longer and more conversational. "
"Encourage engagement with a question or call to action at the end. "
"Do NOT include hashtags."
),
"tiktok": (
"Write a TikTok caption. Keep it very short and punchy (under 150 characters). "
"Use a hook that grabs attention. Do NOT include hashtags."
),
"youtube": (
"Write a YouTube video description. Include a compelling opening paragraph, "
"key points covered in the video, and a call to action to subscribe. "
"Do NOT include hashtags."
),
"twitter": (
"Write a tweet. Maximum 280 characters. Be concise and impactful. "
"Do NOT include hashtags."
),
}
guidance = platform_guidance.get(
platform.lower(),
f"Write a social media post for {platform}. Do NOT include hashtags.",
)
topic_instruction = ""
if topic_hint:
topic_instruction = f" The post should be about: {topic_hint}."
prompt = (
f"{guidance}{topic_instruction}\n\n"
f"Write the post now. Output ONLY the post text, nothing else."
)
return llm_provider.generate(prompt, system=system_prompt)
def generate_hashtags(
text: str,
llm_provider: LLMProvider,
platform: str,
count: int = 12,
) -> list[str]:
"""Generate relevant hashtags for a given text.
Args:
text: The post text to generate hashtags for.
llm_provider: LLM provider instance.
platform: Target platform.
count: Number of hashtags to generate.
Returns:
List of hashtag strings (each prefixed with #).
"""
platform_limits = {
"instagram": 30,
"tiktok": 5,
"twitter": 3,
"facebook": 5,
"youtube": 15,
}
max_tags = min(count, platform_limits.get(platform.lower(), count))
system_prompt = (
"You are a social media hashtag strategist. You generate relevant, "
"effective hashtags that maximize reach and engagement."
)
prompt = (
f"Generate exactly {max_tags} hashtags for the following {platform} post.\n\n"
f"Post text:\n{text}\n\n"
f"Rules:\n"
f"- Mix popular (high reach) and niche (targeted) hashtags\n"
f"- Each hashtag must start with #\n"
f"- No spaces within hashtags, use CamelCase for multi-word\n"
f"- Output ONLY the hashtags, one per line, nothing else"
)
result = llm_provider.generate(prompt, system=system_prompt)
# Parse hashtags from the response
hashtags: list[str] = []
for line in result.strip().splitlines():
tag = line.strip()
if not tag:
continue
# Ensure it starts with #
if not tag.startswith("#"):
tag = f"#{tag}"
# Remove any trailing punctuation or spaces
tag = tag.split()[0] # Take only the first word if extra text
hashtags.append(tag)
return hashtags[:max_tags]
def inject_affiliate_links(
text: str,
affiliate_links: list[dict],
topics: list[str],
) -> tuple[str, list[dict]]:
"""Find relevant affiliate links and append them to the post text.
Matches affiliate links based on topic overlap. Links whose keywords
overlap with the provided topics are appended naturally at the end.
Args:
text: Original post text.
affiliate_links: List of dicts, each with keys:
- url (str): The affiliate URL
- label (str): Display text for the link
- keywords (list[str]): Topic keywords this link is relevant for
topics: Current post topics to match against.
Returns:
Tuple of (modified_text, links_used) where links_used is the list
of affiliate link dicts that were injected.
"""
if not affiliate_links or not topics:
return text, []
# Normalize topics to lowercase for matching
topics_lower = {t.lower() for t in topics}
# Score each link by keyword overlap
scored_links: list[tuple[int, dict]] = []
for link in affiliate_links:
keywords = link.get("keywords", [])
keywords_lower = {k.lower() for k in keywords}
overlap = len(topics_lower & keywords_lower)
if overlap > 0:
scored_links.append((overlap, link))
if not scored_links:
return text, []
# Sort by relevance (most overlap first), take top 2
scored_links.sort(key=lambda x: x[0], reverse=True)
top_links = [link for _, link in scored_links[:2]]
# Build the links section
links_section_parts: list[str] = []
for link in top_links:
label = link.get("label", "Check this out")
url = link.get("url", "")
links_section_parts.append(f"{label}: {url}")
links_text = "\n".join(links_section_parts)
modified_text = f"{text}\n\n{links_text}"
return modified_text, top_links

View File

@@ -0,0 +1,181 @@
"""
Image generation abstraction layer.
Supports DALL-E (OpenAI) and Replicate (Stability AI SDXL) for image generation.
"""
import time
from abc import ABC, abstractmethod
import httpx
TIMEOUT = 120.0
POLL_INTERVAL = 2.0
MAX_POLL_ATTEMPTS = 60
class ImageProvider(ABC):
"""Abstract base class for image generation providers."""
def __init__(self, api_key: str, model: str | None = None):
self.api_key = api_key
self.model = model
@abstractmethod
def generate(self, prompt: str, size: str = "1024x1024") -> str:
"""Generate an image from a text prompt.
Args:
prompt: Text description of the image to generate.
size: Image dimensions as 'WIDTHxHEIGHT' string.
Returns:
URL of the generated image.
"""
...
class DallEProvider(ImageProvider):
"""OpenAI DALL-E 3 image generation provider."""
API_URL = "https://api.openai.com/v1/images/generations"
def __init__(self, api_key: str, model: str | None = None):
super().__init__(api_key, model or "dall-e-3")
def generate(self, prompt: str, size: str = "1024x1024") -> str:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"prompt": prompt,
"n": 1,
"size": size,
"response_format": "url",
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(self.API_URL, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["data"][0]["url"]
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"DALL-E API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"DALL-E API request failed: {e}") from e
class ReplicateProvider(ImageProvider):
"""Replicate image generation provider using Stability AI SDXL."""
API_URL = "https://api.replicate.com/v1/predictions"
def __init__(self, api_key: str, model: str | None = None):
super().__init__(api_key, model or "stability-ai/sdxl:latest")
def generate(self, prompt: str, size: str = "1024x1024") -> str:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
# Parse size into width and height
try:
width, height = (int(d) for d in size.split("x"))
except ValueError:
width, height = 1024, 1024
# Determine the version string from the model
# Replicate expects "owner/model:version" or uses the version hash directly
version = self.model
payload = {
"version": version,
"input": {
"prompt": prompt,
"width": width,
"height": height,
},
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
# Create prediction
response = client.post(self.API_URL, headers=headers, json=payload)
response.raise_for_status()
prediction = response.json()
prediction_url = prediction.get("urls", {}).get("get")
if not prediction_url:
prediction_id = prediction.get("id")
prediction_url = f"{self.API_URL}/{prediction_id}"
# Poll for completion
for _ in range(MAX_POLL_ATTEMPTS):
poll_response = client.get(prediction_url, headers=headers)
poll_response.raise_for_status()
result = poll_response.json()
status = result.get("status")
if status == "succeeded":
output = result.get("output")
if isinstance(output, list) and output:
return output[0]
if isinstance(output, str):
return output
raise RuntimeError(
f"Replicate returned unexpected output format: {output}"
)
if status == "failed":
error = result.get("error", "Unknown error")
raise RuntimeError(f"Replicate prediction failed: {error}")
if status == "canceled":
raise RuntimeError("Replicate prediction was canceled")
time.sleep(POLL_INTERVAL)
raise RuntimeError(
"Replicate prediction timed out after polling"
)
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"Replicate API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"Replicate API request failed: {e}") from e
def get_image_provider(
provider_name: str, api_key: str, model: str | None = None
) -> ImageProvider:
"""Factory function to get an image generation provider instance.
Args:
provider_name: One of 'dalle', 'replicate'.
api_key: API key for the provider.
model: Optional model override. Uses default if not specified.
Returns:
An ImageProvider instance.
Raises:
ValueError: If provider_name is not supported.
"""
providers = {
"dalle": DallEProvider,
"replicate": ReplicateProvider,
}
provider_cls = providers.get(provider_name.lower())
if provider_cls is None:
supported = ", ".join(providers.keys())
raise ValueError(
f"Unknown image provider '{provider_name}'. Supported: {supported}"
)
return provider_cls(api_key=api_key, model=model)

194
backend/app/services/llm.py Normal file
View File

@@ -0,0 +1,194 @@
"""
Multi-LLM abstraction layer.
Supports Claude (Anthropic), OpenAI, and Gemini via direct HTTP calls using httpx.
Each provider implements the same interface for text generation.
"""
from abc import ABC, abstractmethod
import httpx
# Default models per provider
DEFAULT_MODELS = {
"claude": "claude-sonnet-4-20250514",
"openai": "gpt-4o-mini",
"gemini": "gemini-2.0-flash",
}
TIMEOUT = 60.0
class LLMProvider(ABC):
"""Abstract base class for LLM providers."""
def __init__(self, api_key: str, model: str | None = None):
self.api_key = api_key
self.model = model
@abstractmethod
def generate(self, prompt: str, system: str = "") -> str:
"""Generate text from a prompt.
Args:
prompt: The user prompt / message.
system: Optional system prompt for context and behavior.
Returns:
Generated text string.
"""
...
class ClaudeProvider(LLMProvider):
"""Anthropic Claude provider via Messages API."""
API_URL = "https://api.anthropic.com/v1/messages"
def __init__(self, api_key: str, model: str | None = None):
super().__init__(api_key, model or DEFAULT_MODELS["claude"])
def generate(self, prompt: str, system: str = "") -> str:
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload: dict = {
"model": self.model,
"max_tokens": 2048,
"messages": [{"role": "user", "content": prompt}],
}
if system:
payload["system"] = system
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(self.API_URL, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
# Claude returns content as a list of content blocks
content_blocks = data.get("content", [])
return "".join(
block.get("text", "") for block in content_blocks if block.get("type") == "text"
)
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"Claude API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"Claude API request failed: {e}") from e
class OpenAIProvider(LLMProvider):
"""OpenAI provider via Chat Completions API."""
API_URL = "https://api.openai.com/v1/chat/completions"
def __init__(self, api_key: str, model: str | None = None):
super().__init__(api_key, model or DEFAULT_MODELS["openai"])
def generate(self, prompt: str, system: str = "") -> str:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
messages: list[dict] = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
payload = {
"model": self.model,
"messages": messages,
"max_tokens": 2048,
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(self.API_URL, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"OpenAI API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"OpenAI API request failed: {e}") from e
class GeminiProvider(LLMProvider):
"""Google Gemini provider via Generative Language API."""
API_BASE = "https://generativelanguage.googleapis.com/v1beta/models"
def __init__(self, api_key: str, model: str | None = None):
super().__init__(api_key, model or DEFAULT_MODELS["gemini"])
def generate(self, prompt: str, system: str = "") -> str:
url = f"{self.API_BASE}/{self.model}:generateContent"
params = {"key": self.api_key}
headers = {"Content-Type": "application/json"}
# Build contents; Gemini uses a parts-based structure
parts: list[dict] = []
if system:
parts.append({"text": f"{system}\n\n{prompt}"})
else:
parts.append({"text": prompt})
payload = {
"contents": [{"parts": parts}],
"generationConfig": {
"maxOutputTokens": 2048,
},
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(url, params=params, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
candidates = data.get("candidates", [])
if not candidates:
return ""
content = candidates[0].get("content", {})
parts_out = content.get("parts", [])
return "".join(part.get("text", "") for part in parts_out)
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"Gemini API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"Gemini API request failed: {e}") from e
def get_llm_provider(
provider_name: str, api_key: str, model: str | None = None
) -> LLMProvider:
"""Factory function to get an LLM provider instance.
Args:
provider_name: One of 'claude', 'openai', 'gemini'.
api_key: API key for the provider.
model: Optional model override. Uses default if not specified.
Returns:
An LLMProvider instance.
Raises:
ValueError: If provider_name is not supported.
"""
providers = {
"claude": ClaudeProvider,
"openai": OpenAIProvider,
"gemini": GeminiProvider,
}
provider_cls = providers.get(provider_name.lower())
if provider_cls is None:
supported = ", ".join(providers.keys())
raise ValueError(
f"Unknown LLM provider '{provider_name}'. Supported: {supported}"
)
return provider_cls(api_key=api_key, model=model)

View File

@@ -0,0 +1,169 @@
"""PromptService — carica, lista e compila prompt .txt con variabili.
Gestisce i file .txt dei prompt LLM nella directory PROMPTS_PATH.
Usa la sintassi {{variabile}} per i placeholder (doppia graffa).
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Optional
# Pattern per trovare le variabili {{nome}} nei template
_VARIABLE_PATTERN = re.compile(r"\{\{(\w+)\}\}")
class PromptService:
"""Servizio per gestire i prompt .txt del sistema di generazione.
Fornisce metodi per:
- Elencare i prompt disponibili
- Caricare il contenuto di un prompt
- Compilare un prompt sostituendo le variabili {{...}}
- Salvare un prompt (per l'editor di Phase 2)
- Estrarre la lista di variabili richieste da un template
"""
def __init__(self, prompts_dir: Path) -> None:
"""Inizializza il servizio con la directory dei prompt.
Args:
prompts_dir: Path alla directory contenente i file .txt dei prompt.
Tipicamente PROMPTS_PATH da backend.config.
Raises:
FileNotFoundError: Se la directory non esiste.
"""
if not prompts_dir.exists():
raise FileNotFoundError(
f"Directory prompt non trovata: {prompts_dir}. "
"Verifica che PROMPTS_PATH sia configurato correttamente."
)
if not prompts_dir.is_dir():
raise NotADirectoryError(
f"Il percorso non è una directory: {prompts_dir}"
)
self._prompts_dir = prompts_dir
def list_prompts(self) -> list[str]:
"""Elenca tutti i prompt .txt disponibili nella directory.
Returns:
Lista di nomi file senza estensione, ordinata alfabeticamente.
Es: ['aida_promozione', 'bab_storytelling', 'system_prompt', ...]
"""
return sorted(
p.stem for p in self._prompts_dir.glob("*.txt") if p.is_file()
)
def load_prompt(self, name: str) -> str:
"""Carica il contenuto grezzo di un prompt .txt.
Args:
name: Nome del prompt senza estensione (es. "pas_valore")
Returns:
Contenuto testuale del file prompt
Raises:
FileNotFoundError: Se il file non esiste
"""
path = self._get_path(name)
if not path.exists():
available = self.list_prompts()
raise FileNotFoundError(
f"Prompt '{name}' non trovato in {self._prompts_dir}. "
f"Prompt disponibili: {available}"
)
return path.read_text(encoding="utf-8")
def compile_prompt(self, name: str, variables: dict[str, str]) -> str:
"""Carica un prompt e sostituisce tutte le variabili {{nome}} con i valori forniti.
Args:
name: Nome del prompt senza estensione
variables: Dizionario { nome_variabile: valore }
Returns:
Testo del prompt con tutte le variabili sostituite
Raises:
FileNotFoundError: Se il prompt non esiste
ValueError: Se una variabile nel template non ha corrispondenza nel dict
"""
template = self.load_prompt(name)
# Verifica che tutte le variabili del template siano nel dict
required = set(_VARIABLE_PATTERN.findall(template))
provided = set(variables.keys())
missing = required - provided
if missing:
raise ValueError(
f"Variabili mancanti per il prompt '{name}': {sorted(missing)}. "
f"Fornire: {sorted(required)}"
)
def replace_var(match: re.Match) -> str:
var_name = match.group(1)
return variables[var_name]
return _VARIABLE_PATTERN.sub(replace_var, template)
def save_prompt(self, name: str, content: str) -> None:
"""Salva il contenuto di un prompt nel file .txt.
Usato dall'editor di prompt in Phase 2.
Args:
name: Nome del prompt senza estensione
content: Contenuto testuale da salvare
Raises:
ValueError: Se il nome contiene caratteri non sicuri
"""
# Sicurezza: validazione nome file (solo lettere, cifre, underscore, trattino)
if not re.match(r"^[\w\-]+$", name):
raise ValueError(
f"Nome prompt non valido: '{name}'. "
"Usa solo lettere, cifre, underscore e trattino."
)
path = self._get_path(name)
path.write_text(content, encoding="utf-8")
def get_required_variables(self, name: str) -> list[str]:
"""Analizza il template e ritorna la lista delle variabili richieste.
Args:
name: Nome del prompt senza estensione
Returns:
Lista ordinata di nomi variabile (senza doppie graffe)
Es: ['brand_name', 'livello_schwartz', 'obiettivo_campagna', 'target_nicchia', 'topic']
Raises:
FileNotFoundError: Se il prompt non esiste
"""
template = self.load_prompt(name)
variables = sorted(set(_VARIABLE_PATTERN.findall(template)))
return variables
def prompt_exists(self, name: str) -> bool:
"""Verifica se un prompt esiste.
Args:
name: Nome del prompt senza estensione
Returns:
True se il file esiste
"""
return self._get_path(name).exists()
# ---------------------------------------------------------------------------
# Metodi privati
# ---------------------------------------------------------------------------
def _get_path(self, name: str) -> Path:
"""Costruisce il percorso completo per un file prompt."""
return self._prompts_dir / f"{name}.txt"

View File

@@ -0,0 +1,706 @@
"""
Social media publishing abstraction layer.
Supports Facebook, Instagram, YouTube, and TikTok via their respective APIs.
All HTTP calls use httpx (sync).
"""
from __future__ import annotations
import time
from abc import ABC, abstractmethod
import httpx
TIMEOUT = 120.0
UPLOAD_TIMEOUT = 300.0
class SocialPublisher(ABC):
"""Abstract base class for social media publishers."""
@abstractmethod
def publish_text(self, text: str, **kwargs) -> str:
"""Publish a text-only post.
Returns:
External post ID from the platform.
"""
...
@abstractmethod
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
"""Publish an image post with caption.
Returns:
External post ID from the platform.
"""
...
@abstractmethod
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Publish a video post with caption.
Returns:
External post ID from the platform.
"""
...
@abstractmethod
def get_comments(self, post_id: str) -> list[dict]:
"""Get comments on a post.
Returns:
List of comment dicts with at least 'id', 'text', 'author' keys.
"""
...
@abstractmethod
def reply_to_comment(self, comment_id: str, text: str) -> bool:
"""Reply to a specific comment.
Returns:
True if reply was successful, False otherwise.
"""
...
class FacebookPublisher(SocialPublisher):
"""Facebook Page publishing via Graph API.
Required API setup:
- Create a Facebook App at https://developers.facebook.com
- Request 'pages_manage_posts', 'pages_read_engagement' permissions
- Get a Page Access Token (long-lived recommended)
- The page_id is the Facebook Page ID (numeric)
"""
GRAPH_API_BASE = "https://graph.facebook.com/v18.0"
def __init__(self, access_token: str, page_id: str):
self.access_token = access_token
self.page_id = page_id
def _request(
self,
method: str,
endpoint: str,
params: dict | None = None,
data: dict | None = None,
files: dict | None = None,
timeout: float = TIMEOUT,
) -> dict:
"""Make a request to the Facebook Graph API."""
url = f"{self.GRAPH_API_BASE}{endpoint}"
if params is None:
params = {}
params["access_token"] = self.access_token
try:
with httpx.Client(timeout=timeout) as client:
if method == "GET":
response = client.get(url, params=params)
elif files:
response = client.post(url, params=params, data=data, files=files)
else:
response = client.post(url, params=params, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"Facebook API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"Facebook API request failed: {e}") from e
def publish_text(self, text: str, **kwargs) -> str:
data = {"message": text}
result = self._request("POST", f"/{self.page_id}/feed", data=data)
return result.get("id", "")
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
data = {
"message": text,
"url": image_url,
}
result = self._request("POST", f"/{self.page_id}/photos", data=data)
return result.get("id", "")
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
with open(video_path, "rb") as video_file:
files = {"source": ("video.mp4", video_file, "video/mp4")}
form_data = {"description": text}
result = self._request(
"POST",
f"/{self.page_id}/videos",
data=form_data,
files=files,
timeout=UPLOAD_TIMEOUT,
)
return result.get("id", "")
def get_comments(self, post_id: str) -> list[dict]:
result = self._request("GET", f"/{post_id}/comments")
comments = []
for item in result.get("data", []):
comments.append({
"id": item.get("id", ""),
"text": item.get("message", ""),
"author": item.get("from", {}).get("name", "Unknown"),
"created_at": item.get("created_time", ""),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
try:
self._request("POST", f"/{comment_id}/comments", data={"message": text})
return True
except RuntimeError:
return False
class InstagramPublisher(SocialPublisher):
"""Instagram publishing via Instagram Graph API (Business/Creator accounts).
Required API setup:
- Facebook App with Instagram Graph API enabled
- Instagram Business or Creator account linked to a Facebook Page
- Permissions: 'instagram_basic', 'instagram_content_publish'
- ig_user_id is the Instagram Business Account ID (from Facebook Graph API)
- Note: Text-only posts are not supported by Instagram API
"""
GRAPH_API_BASE = "https://graph.facebook.com/v18.0"
def __init__(self, access_token: str, ig_user_id: str):
self.access_token = access_token
self.ig_user_id = ig_user_id
def _request(
self,
method: str,
endpoint: str,
params: dict | None = None,
data: dict | None = None,
timeout: float = TIMEOUT,
) -> dict:
"""Make a request to the Instagram Graph API."""
url = f"{self.GRAPH_API_BASE}{endpoint}"
if params is None:
params = {}
params["access_token"] = self.access_token
try:
with httpx.Client(timeout=timeout) as client:
if method == "GET":
response = client.get(url, params=params)
else:
response = client.post(url, params=params, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"Instagram API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"Instagram API request failed: {e}") from e
def publish_text(self, text: str, **kwargs) -> str:
"""Instagram does not support text-only posts.
Raises RuntimeError. Use publish_image or publish_video instead.
"""
raise RuntimeError(
"Instagram does not support text-only posts. "
"Use publish_image() or publish_video() instead."
)
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
# Step 1: Create media container
container_data = {
"image_url": image_url,
"caption": text,
}
container = self._request(
"POST", f"/{self.ig_user_id}/media", data=container_data
)
container_id = container.get("id", "")
if not container_id:
raise RuntimeError("Failed to create Instagram media container")
# Step 2: Publish the container
publish_data = {"creation_id": container_id}
result = self._request(
"POST", f"/{self.ig_user_id}/media_publish", data=publish_data
)
return result.get("id", "")
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Publish a video as a Reel on Instagram.
Note: video_path should be a publicly accessible URL for Instagram API.
For local files, upload to a hosting service first and pass the URL.
"""
video_url = kwargs.get("video_url", video_path)
# Step 1: Create media container for Reel
container_data = {
"media_type": "REELS",
"video_url": video_url,
"caption": text,
}
container = self._request(
"POST", f"/{self.ig_user_id}/media", data=container_data
)
container_id = container.get("id", "")
if not container_id:
raise RuntimeError("Failed to create Instagram video container")
# Step 2: Wait for video processing (poll status)
for _ in range(60):
status = self._request(
"GET",
f"/{container_id}",
params={"fields": "status_code"},
)
status_code = status.get("status_code", "")
if status_code == "FINISHED":
break
if status_code == "ERROR":
raise RuntimeError("Instagram video processing failed")
time.sleep(5)
else:
raise RuntimeError("Instagram video processing timed out")
# Step 3: Publish
publish_data = {"creation_id": container_id}
result = self._request(
"POST", f"/{self.ig_user_id}/media_publish", data=publish_data
)
return result.get("id", "")
def get_comments(self, post_id: str) -> list[dict]:
result = self._request(
"GET",
f"/{post_id}/comments",
params={"fields": "id,text,username,timestamp"},
)
comments = []
for item in result.get("data", []):
comments.append({
"id": item.get("id", ""),
"text": item.get("text", ""),
"author": item.get("username", "Unknown"),
"created_at": item.get("timestamp", ""),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
try:
self._request(
"POST",
f"/{comment_id}/replies",
data={"message": text},
)
return True
except RuntimeError:
return False
class YouTubePublisher(SocialPublisher):
"""YouTube publishing via YouTube Data API v3.
Required API setup:
- Google Cloud project with YouTube Data API v3 enabled
- OAuth 2.0 credentials (access_token from OAuth flow)
- Scopes: 'https://www.googleapis.com/auth/youtube.upload',
'https://www.googleapis.com/auth/youtube.force-ssl'
- Note: Uploads require OAuth, not just an API key
"""
API_BASE = "https://www.googleapis.com"
UPLOAD_URL = "https://www.googleapis.com/upload/youtube/v3/videos"
def __init__(self, access_token: str):
self.access_token = access_token
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.access_token}"}
def publish_text(self, text: str, **kwargs) -> str:
"""YouTube does not support text-only posts via Data API.
Consider using YouTube Community Posts API if available.
"""
raise RuntimeError(
"YouTube does not support text-only posts via the Data API. "
"Use publish_video() instead."
)
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
"""YouTube does not support image-only posts via Data API."""
raise RuntimeError(
"YouTube does not support image posts via the Data API. "
"Use publish_video() instead."
)
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Upload a video to YouTube using resumable upload.
Args:
text: Video description.
video_path: Path to the video file.
**kwargs: Additional options:
- title (str): Video title (default: first 100 chars of text)
- tags (list[str]): Video tags
- privacy (str): 'public', 'unlisted', or 'private' (default: 'public')
- category_id (str): YouTube category ID (default: '22' for People & Blogs)
"""
title = kwargs.get("title", text[:100])
tags = kwargs.get("tags", [])
privacy = kwargs.get("privacy", "public")
category_id = kwargs.get("category_id", "22")
# Step 1: Initialize resumable upload
metadata = {
"snippet": {
"title": title,
"description": text,
"tags": tags,
"categoryId": category_id,
},
"status": {
"privacyStatus": privacy,
},
}
headers = self._headers()
headers["Content-Type"] = "application/json"
try:
with httpx.Client(timeout=UPLOAD_TIMEOUT) as client:
# Init resumable upload
init_response = client.post(
self.UPLOAD_URL,
params={
"uploadType": "resumable",
"part": "snippet,status",
},
headers=headers,
json=metadata,
)
init_response.raise_for_status()
upload_url = init_response.headers.get("location")
if not upload_url:
raise RuntimeError(
"YouTube API did not return a resumable upload URL"
)
# Step 2: Upload the video file
with open(video_path, "rb") as video_file:
video_data = video_file.read()
upload_headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "video/*",
"Content-Length": str(len(video_data)),
}
upload_response = client.put(
upload_url,
headers=upload_headers,
content=video_data,
)
upload_response.raise_for_status()
result = upload_response.json()
return result.get("id", "")
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"YouTube API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"YouTube API request failed: {e}") from e
def get_comments(self, post_id: str) -> list[dict]:
"""Get comment threads for a video.
Args:
post_id: YouTube video ID.
"""
url = f"{self.API_BASE}/youtube/v3/commentThreads"
params = {
"part": "snippet",
"videoId": post_id,
"maxResults": 100,
"order": "time",
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.get(url, headers=self._headers(), params=params)
response.raise_for_status()
data = response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"YouTube API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"YouTube API request failed: {e}") from e
comments = []
for item in data.get("items", []):
snippet = item.get("snippet", {}).get("topLevelComment", {}).get("snippet", {})
comments.append({
"id": item.get("snippet", {}).get("topLevelComment", {}).get("id", ""),
"text": snippet.get("textDisplay", ""),
"author": snippet.get("authorDisplayName", "Unknown"),
"created_at": snippet.get("publishedAt", ""),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
"""Reply to a YouTube comment.
Args:
comment_id: The parent comment ID to reply to.
text: Reply text.
"""
url = f"{self.API_BASE}/youtube/v3/comments"
params = {"part": "snippet"}
payload = {
"snippet": {
"parentId": comment_id,
"textOriginal": text,
}
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(
url, headers=self._headers(), params=params, json=payload
)
response.raise_for_status()
return True
except (httpx.HTTPStatusError, httpx.RequestError):
return False
class TikTokPublisher(SocialPublisher):
"""TikTok publishing via Content Posting API.
Required API setup:
- Register app at https://developers.tiktok.com
- Apply for 'Content Posting API' access
- OAuth 2.0 flow to get access_token
- Scopes: 'video.publish', 'video.upload'
- Note: TikTok API access requires app review and approval
- Text-only and image posts are not supported via API
"""
API_BASE = "https://open.tiktokapis.com/v2"
def __init__(self, access_token: str):
self.access_token = access_token
def _headers(self) -> dict:
return {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
def publish_text(self, text: str, **kwargs) -> str:
"""TikTok does not support text-only posts via API."""
raise RuntimeError(
"TikTok does not support text-only posts via the Content Posting API. "
"Use publish_video() instead."
)
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
"""TikTok image posting is limited. Use publish_video instead."""
raise RuntimeError(
"TikTok image posting is not widely supported via the API. "
"Use publish_video() instead."
)
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Publish a video to TikTok using the Content Posting API.
Args:
text: Video caption/description.
video_path: Path to the video file.
**kwargs: Additional options:
- privacy_level (str): 'PUBLIC_TO_EVERYONE', 'MUTUAL_FOLLOW_FRIENDS',
'FOLLOWER_OF_CREATOR', 'SELF_ONLY' (default: 'PUBLIC_TO_EVERYONE')
- disable_comment (bool): Disable comments (default: False)
- disable_duet (bool): Disable duet (default: False)
- disable_stitch (bool): Disable stitch (default: False)
"""
privacy_level = kwargs.get("privacy_level", "PUBLIC_TO_EVERYONE")
disable_comment = kwargs.get("disable_comment", False)
disable_duet = kwargs.get("disable_duet", False)
disable_stitch = kwargs.get("disable_stitch", False)
# Get file size for chunk upload
import os
file_size = os.path.getsize(video_path)
# Step 1: Initialize video upload
init_url = f"{self.API_BASE}/post/publish/video/init/"
init_payload = {
"post_info": {
"title": text[:150], # TikTok title limit
"privacy_level": privacy_level,
"disable_comment": disable_comment,
"disable_duet": disable_duet,
"disable_stitch": disable_stitch,
},
"source_info": {
"source": "FILE_UPLOAD",
"video_size": file_size,
"chunk_size": file_size, # Single chunk upload
"total_chunk_count": 1,
},
}
try:
with httpx.Client(timeout=UPLOAD_TIMEOUT) as client:
init_response = client.post(
init_url, headers=self._headers(), json=init_payload
)
init_response.raise_for_status()
init_data = init_response.json()
publish_id = init_data.get("data", {}).get("publish_id", "")
upload_url = init_data.get("data", {}).get("upload_url", "")
if not upload_url:
raise RuntimeError(
f"TikTok API did not return upload URL: {init_data}"
)
# Step 2: Upload the video file
with open(video_path, "rb") as video_file:
video_data = video_file.read()
upload_headers = {
"Content-Type": "video/mp4",
"Content-Range": f"bytes 0-{file_size - 1}/{file_size}",
}
upload_response = client.put(
upload_url,
headers=upload_headers,
content=video_data,
)
upload_response.raise_for_status()
return publish_id
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"TikTok API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"TikTok API request failed: {e}") from e
def get_comments(self, post_id: str) -> list[dict]:
"""Get comments on a TikTok video.
Note: Requires 'video.list' scope and comment read access.
"""
url = f"{self.API_BASE}/comment/list/"
payload = {
"video_id": post_id,
"max_count": 50,
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(url, headers=self._headers(), json=payload)
response.raise_for_status()
data = response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"TikTok API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"TikTok API request failed: {e}") from e
comments = []
for item in data.get("data", {}).get("comments", []):
comments.append({
"id": item.get("id", ""),
"text": item.get("text", ""),
"author": item.get("user", {}).get("display_name", "Unknown"),
"created_at": str(item.get("create_time", "")),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
"""Reply to a TikTok comment.
Note: Comment reply functionality may be limited depending on API access level.
"""
url = f"{self.API_BASE}/comment/reply/"
payload = {
"comment_id": comment_id,
"text": text,
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(url, headers=self._headers(), json=payload)
response.raise_for_status()
return True
except (httpx.HTTPStatusError, httpx.RequestError):
return False
def get_publisher(
platform: str, access_token: str, **kwargs
) -> SocialPublisher:
"""Factory function to get a social media publisher instance.
Args:
platform: One of 'facebook', 'instagram', 'youtube', 'tiktok'.
access_token: OAuth access token for the platform.
**kwargs: Additional platform-specific arguments:
- facebook: page_id (str) - required
- instagram: ig_user_id (str) - required
Returns:
A SocialPublisher instance.
Raises:
ValueError: If platform is not supported or required kwargs are missing.
"""
platform_lower = platform.lower()
if platform_lower == "facebook":
page_id = kwargs.get("page_id")
if not page_id:
raise ValueError("FacebookPublisher requires 'page_id' parameter")
return FacebookPublisher(access_token=access_token, page_id=page_id)
elif platform_lower == "instagram":
ig_user_id = kwargs.get("ig_user_id")
if not ig_user_id:
raise ValueError("InstagramPublisher requires 'ig_user_id' parameter")
return InstagramPublisher(access_token=access_token, ig_user_id=ig_user_id)
elif platform_lower == "youtube":
return YouTubePublisher(access_token=access_token)
elif platform_lower == "tiktok":
return TikTokPublisher(access_token=access_token)
else:
supported = "facebook, instagram, youtube, tiktok"
raise ValueError(
f"Unknown platform '{platform}'. Supported: {supported}"
)