Files
leopost-full/backend/app/services/social.py
Michele 519a580679 Initial commit: Leopost Full — merge di Leopost, Post Generator e Autopilot OS
- Backend FastAPI con multi-LLM (Claude/OpenAI/Gemini)
- Publishing su Facebook, Instagram, YouTube, TikTok
- Calendario editoriale con awareness levels (PAS, AIDA, BAB...)
- Design system Editorial Fresh (Fraunces + DM Sans)
- Scheduler automatico, gestione commenti AI, affiliate links

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 17:23:16 +02:00

707 lines
24 KiB
Python

"""
Social media publishing abstraction layer.
Supports Facebook, Instagram, YouTube, and TikTok via their respective APIs.
All HTTP calls use httpx (sync).
"""
from __future__ import annotations
import time
from abc import ABC, abstractmethod
import httpx
TIMEOUT = 120.0
UPLOAD_TIMEOUT = 300.0
class SocialPublisher(ABC):
"""Abstract base class for social media publishers."""
@abstractmethod
def publish_text(self, text: str, **kwargs) -> str:
"""Publish a text-only post.
Returns:
External post ID from the platform.
"""
...
@abstractmethod
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
"""Publish an image post with caption.
Returns:
External post ID from the platform.
"""
...
@abstractmethod
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Publish a video post with caption.
Returns:
External post ID from the platform.
"""
...
@abstractmethod
def get_comments(self, post_id: str) -> list[dict]:
"""Get comments on a post.
Returns:
List of comment dicts with at least 'id', 'text', 'author' keys.
"""
...
@abstractmethod
def reply_to_comment(self, comment_id: str, text: str) -> bool:
"""Reply to a specific comment.
Returns:
True if reply was successful, False otherwise.
"""
...
class FacebookPublisher(SocialPublisher):
"""Facebook Page publishing via Graph API.
Required API setup:
- Create a Facebook App at https://developers.facebook.com
- Request 'pages_manage_posts', 'pages_read_engagement' permissions
- Get a Page Access Token (long-lived recommended)
- The page_id is the Facebook Page ID (numeric)
"""
GRAPH_API_BASE = "https://graph.facebook.com/v18.0"
def __init__(self, access_token: str, page_id: str):
self.access_token = access_token
self.page_id = page_id
def _request(
self,
method: str,
endpoint: str,
params: dict | None = None,
data: dict | None = None,
files: dict | None = None,
timeout: float = TIMEOUT,
) -> dict:
"""Make a request to the Facebook Graph API."""
url = f"{self.GRAPH_API_BASE}{endpoint}"
if params is None:
params = {}
params["access_token"] = self.access_token
try:
with httpx.Client(timeout=timeout) as client:
if method == "GET":
response = client.get(url, params=params)
elif files:
response = client.post(url, params=params, data=data, files=files)
else:
response = client.post(url, params=params, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"Facebook API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"Facebook API request failed: {e}") from e
def publish_text(self, text: str, **kwargs) -> str:
data = {"message": text}
result = self._request("POST", f"/{self.page_id}/feed", data=data)
return result.get("id", "")
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
data = {
"message": text,
"url": image_url,
}
result = self._request("POST", f"/{self.page_id}/photos", data=data)
return result.get("id", "")
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
with open(video_path, "rb") as video_file:
files = {"source": ("video.mp4", video_file, "video/mp4")}
form_data = {"description": text}
result = self._request(
"POST",
f"/{self.page_id}/videos",
data=form_data,
files=files,
timeout=UPLOAD_TIMEOUT,
)
return result.get("id", "")
def get_comments(self, post_id: str) -> list[dict]:
result = self._request("GET", f"/{post_id}/comments")
comments = []
for item in result.get("data", []):
comments.append({
"id": item.get("id", ""),
"text": item.get("message", ""),
"author": item.get("from", {}).get("name", "Unknown"),
"created_at": item.get("created_time", ""),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
try:
self._request("POST", f"/{comment_id}/comments", data={"message": text})
return True
except RuntimeError:
return False
class InstagramPublisher(SocialPublisher):
"""Instagram publishing via Instagram Graph API (Business/Creator accounts).
Required API setup:
- Facebook App with Instagram Graph API enabled
- Instagram Business or Creator account linked to a Facebook Page
- Permissions: 'instagram_basic', 'instagram_content_publish'
- ig_user_id is the Instagram Business Account ID (from Facebook Graph API)
- Note: Text-only posts are not supported by Instagram API
"""
GRAPH_API_BASE = "https://graph.facebook.com/v18.0"
def __init__(self, access_token: str, ig_user_id: str):
self.access_token = access_token
self.ig_user_id = ig_user_id
def _request(
self,
method: str,
endpoint: str,
params: dict | None = None,
data: dict | None = None,
timeout: float = TIMEOUT,
) -> dict:
"""Make a request to the Instagram Graph API."""
url = f"{self.GRAPH_API_BASE}{endpoint}"
if params is None:
params = {}
params["access_token"] = self.access_token
try:
with httpx.Client(timeout=timeout) as client:
if method == "GET":
response = client.get(url, params=params)
else:
response = client.post(url, params=params, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"Instagram API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"Instagram API request failed: {e}") from e
def publish_text(self, text: str, **kwargs) -> str:
"""Instagram does not support text-only posts.
Raises RuntimeError. Use publish_image or publish_video instead.
"""
raise RuntimeError(
"Instagram does not support text-only posts. "
"Use publish_image() or publish_video() instead."
)
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
# Step 1: Create media container
container_data = {
"image_url": image_url,
"caption": text,
}
container = self._request(
"POST", f"/{self.ig_user_id}/media", data=container_data
)
container_id = container.get("id", "")
if not container_id:
raise RuntimeError("Failed to create Instagram media container")
# Step 2: Publish the container
publish_data = {"creation_id": container_id}
result = self._request(
"POST", f"/{self.ig_user_id}/media_publish", data=publish_data
)
return result.get("id", "")
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Publish a video as a Reel on Instagram.
Note: video_path should be a publicly accessible URL for Instagram API.
For local files, upload to a hosting service first and pass the URL.
"""
video_url = kwargs.get("video_url", video_path)
# Step 1: Create media container for Reel
container_data = {
"media_type": "REELS",
"video_url": video_url,
"caption": text,
}
container = self._request(
"POST", f"/{self.ig_user_id}/media", data=container_data
)
container_id = container.get("id", "")
if not container_id:
raise RuntimeError("Failed to create Instagram video container")
# Step 2: Wait for video processing (poll status)
for _ in range(60):
status = self._request(
"GET",
f"/{container_id}",
params={"fields": "status_code"},
)
status_code = status.get("status_code", "")
if status_code == "FINISHED":
break
if status_code == "ERROR":
raise RuntimeError("Instagram video processing failed")
time.sleep(5)
else:
raise RuntimeError("Instagram video processing timed out")
# Step 3: Publish
publish_data = {"creation_id": container_id}
result = self._request(
"POST", f"/{self.ig_user_id}/media_publish", data=publish_data
)
return result.get("id", "")
def get_comments(self, post_id: str) -> list[dict]:
result = self._request(
"GET",
f"/{post_id}/comments",
params={"fields": "id,text,username,timestamp"},
)
comments = []
for item in result.get("data", []):
comments.append({
"id": item.get("id", ""),
"text": item.get("text", ""),
"author": item.get("username", "Unknown"),
"created_at": item.get("timestamp", ""),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
try:
self._request(
"POST",
f"/{comment_id}/replies",
data={"message": text},
)
return True
except RuntimeError:
return False
class YouTubePublisher(SocialPublisher):
"""YouTube publishing via YouTube Data API v3.
Required API setup:
- Google Cloud project with YouTube Data API v3 enabled
- OAuth 2.0 credentials (access_token from OAuth flow)
- Scopes: 'https://www.googleapis.com/auth/youtube.upload',
'https://www.googleapis.com/auth/youtube.force-ssl'
- Note: Uploads require OAuth, not just an API key
"""
API_BASE = "https://www.googleapis.com"
UPLOAD_URL = "https://www.googleapis.com/upload/youtube/v3/videos"
def __init__(self, access_token: str):
self.access_token = access_token
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.access_token}"}
def publish_text(self, text: str, **kwargs) -> str:
"""YouTube does not support text-only posts via Data API.
Consider using YouTube Community Posts API if available.
"""
raise RuntimeError(
"YouTube does not support text-only posts via the Data API. "
"Use publish_video() instead."
)
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
"""YouTube does not support image-only posts via Data API."""
raise RuntimeError(
"YouTube does not support image posts via the Data API. "
"Use publish_video() instead."
)
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Upload a video to YouTube using resumable upload.
Args:
text: Video description.
video_path: Path to the video file.
**kwargs: Additional options:
- title (str): Video title (default: first 100 chars of text)
- tags (list[str]): Video tags
- privacy (str): 'public', 'unlisted', or 'private' (default: 'public')
- category_id (str): YouTube category ID (default: '22' for People & Blogs)
"""
title = kwargs.get("title", text[:100])
tags = kwargs.get("tags", [])
privacy = kwargs.get("privacy", "public")
category_id = kwargs.get("category_id", "22")
# Step 1: Initialize resumable upload
metadata = {
"snippet": {
"title": title,
"description": text,
"tags": tags,
"categoryId": category_id,
},
"status": {
"privacyStatus": privacy,
},
}
headers = self._headers()
headers["Content-Type"] = "application/json"
try:
with httpx.Client(timeout=UPLOAD_TIMEOUT) as client:
# Init resumable upload
init_response = client.post(
self.UPLOAD_URL,
params={
"uploadType": "resumable",
"part": "snippet,status",
},
headers=headers,
json=metadata,
)
init_response.raise_for_status()
upload_url = init_response.headers.get("location")
if not upload_url:
raise RuntimeError(
"YouTube API did not return a resumable upload URL"
)
# Step 2: Upload the video file
with open(video_path, "rb") as video_file:
video_data = video_file.read()
upload_headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "video/*",
"Content-Length": str(len(video_data)),
}
upload_response = client.put(
upload_url,
headers=upload_headers,
content=video_data,
)
upload_response.raise_for_status()
result = upload_response.json()
return result.get("id", "")
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"YouTube API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"YouTube API request failed: {e}") from e
def get_comments(self, post_id: str) -> list[dict]:
"""Get comment threads for a video.
Args:
post_id: YouTube video ID.
"""
url = f"{self.API_BASE}/youtube/v3/commentThreads"
params = {
"part": "snippet",
"videoId": post_id,
"maxResults": 100,
"order": "time",
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.get(url, headers=self._headers(), params=params)
response.raise_for_status()
data = response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"YouTube API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"YouTube API request failed: {e}") from e
comments = []
for item in data.get("items", []):
snippet = item.get("snippet", {}).get("topLevelComment", {}).get("snippet", {})
comments.append({
"id": item.get("snippet", {}).get("topLevelComment", {}).get("id", ""),
"text": snippet.get("textDisplay", ""),
"author": snippet.get("authorDisplayName", "Unknown"),
"created_at": snippet.get("publishedAt", ""),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
"""Reply to a YouTube comment.
Args:
comment_id: The parent comment ID to reply to.
text: Reply text.
"""
url = f"{self.API_BASE}/youtube/v3/comments"
params = {"part": "snippet"}
payload = {
"snippet": {
"parentId": comment_id,
"textOriginal": text,
}
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(
url, headers=self._headers(), params=params, json=payload
)
response.raise_for_status()
return True
except (httpx.HTTPStatusError, httpx.RequestError):
return False
class TikTokPublisher(SocialPublisher):
"""TikTok publishing via Content Posting API.
Required API setup:
- Register app at https://developers.tiktok.com
- Apply for 'Content Posting API' access
- OAuth 2.0 flow to get access_token
- Scopes: 'video.publish', 'video.upload'
- Note: TikTok API access requires app review and approval
- Text-only and image posts are not supported via API
"""
API_BASE = "https://open.tiktokapis.com/v2"
def __init__(self, access_token: str):
self.access_token = access_token
def _headers(self) -> dict:
return {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
def publish_text(self, text: str, **kwargs) -> str:
"""TikTok does not support text-only posts via API."""
raise RuntimeError(
"TikTok does not support text-only posts via the Content Posting API. "
"Use publish_video() instead."
)
def publish_image(self, text: str, image_url: str, **kwargs) -> str:
"""TikTok image posting is limited. Use publish_video instead."""
raise RuntimeError(
"TikTok image posting is not widely supported via the API. "
"Use publish_video() instead."
)
def publish_video(self, text: str, video_path: str, **kwargs) -> str:
"""Publish a video to TikTok using the Content Posting API.
Args:
text: Video caption/description.
video_path: Path to the video file.
**kwargs: Additional options:
- privacy_level (str): 'PUBLIC_TO_EVERYONE', 'MUTUAL_FOLLOW_FRIENDS',
'FOLLOWER_OF_CREATOR', 'SELF_ONLY' (default: 'PUBLIC_TO_EVERYONE')
- disable_comment (bool): Disable comments (default: False)
- disable_duet (bool): Disable duet (default: False)
- disable_stitch (bool): Disable stitch (default: False)
"""
privacy_level = kwargs.get("privacy_level", "PUBLIC_TO_EVERYONE")
disable_comment = kwargs.get("disable_comment", False)
disable_duet = kwargs.get("disable_duet", False)
disable_stitch = kwargs.get("disable_stitch", False)
# Get file size for chunk upload
import os
file_size = os.path.getsize(video_path)
# Step 1: Initialize video upload
init_url = f"{self.API_BASE}/post/publish/video/init/"
init_payload = {
"post_info": {
"title": text[:150], # TikTok title limit
"privacy_level": privacy_level,
"disable_comment": disable_comment,
"disable_duet": disable_duet,
"disable_stitch": disable_stitch,
},
"source_info": {
"source": "FILE_UPLOAD",
"video_size": file_size,
"chunk_size": file_size, # Single chunk upload
"total_chunk_count": 1,
},
}
try:
with httpx.Client(timeout=UPLOAD_TIMEOUT) as client:
init_response = client.post(
init_url, headers=self._headers(), json=init_payload
)
init_response.raise_for_status()
init_data = init_response.json()
publish_id = init_data.get("data", {}).get("publish_id", "")
upload_url = init_data.get("data", {}).get("upload_url", "")
if not upload_url:
raise RuntimeError(
f"TikTok API did not return upload URL: {init_data}"
)
# Step 2: Upload the video file
with open(video_path, "rb") as video_file:
video_data = video_file.read()
upload_headers = {
"Content-Type": "video/mp4",
"Content-Range": f"bytes 0-{file_size - 1}/{file_size}",
}
upload_response = client.put(
upload_url,
headers=upload_headers,
content=video_data,
)
upload_response.raise_for_status()
return publish_id
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"TikTok API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"TikTok API request failed: {e}") from e
def get_comments(self, post_id: str) -> list[dict]:
"""Get comments on a TikTok video.
Note: Requires 'video.list' scope and comment read access.
"""
url = f"{self.API_BASE}/comment/list/"
payload = {
"video_id": post_id,
"max_count": 50,
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(url, headers=self._headers(), json=payload)
response.raise_for_status()
data = response.json()
except httpx.HTTPStatusError as e:
raise RuntimeError(
f"TikTok API error {e.response.status_code}: {e.response.text}"
) from e
except httpx.RequestError as e:
raise RuntimeError(f"TikTok API request failed: {e}") from e
comments = []
for item in data.get("data", {}).get("comments", []):
comments.append({
"id": item.get("id", ""),
"text": item.get("text", ""),
"author": item.get("user", {}).get("display_name", "Unknown"),
"created_at": str(item.get("create_time", "")),
})
return comments
def reply_to_comment(self, comment_id: str, text: str) -> bool:
"""Reply to a TikTok comment.
Note: Comment reply functionality may be limited depending on API access level.
"""
url = f"{self.API_BASE}/comment/reply/"
payload = {
"comment_id": comment_id,
"text": text,
}
try:
with httpx.Client(timeout=TIMEOUT) as client:
response = client.post(url, headers=self._headers(), json=payload)
response.raise_for_status()
return True
except (httpx.HTTPStatusError, httpx.RequestError):
return False
def get_publisher(
platform: str, access_token: str, **kwargs
) -> SocialPublisher:
"""Factory function to get a social media publisher instance.
Args:
platform: One of 'facebook', 'instagram', 'youtube', 'tiktok'.
access_token: OAuth access token for the platform.
**kwargs: Additional platform-specific arguments:
- facebook: page_id (str) - required
- instagram: ig_user_id (str) - required
Returns:
A SocialPublisher instance.
Raises:
ValueError: If platform is not supported or required kwargs are missing.
"""
platform_lower = platform.lower()
if platform_lower == "facebook":
page_id = kwargs.get("page_id")
if not page_id:
raise ValueError("FacebookPublisher requires 'page_id' parameter")
return FacebookPublisher(access_token=access_token, page_id=page_id)
elif platform_lower == "instagram":
ig_user_id = kwargs.get("ig_user_id")
if not ig_user_id:
raise ValueError("InstagramPublisher requires 'ig_user_id' parameter")
return InstagramPublisher(access_token=access_token, ig_user_id=ig_user_id)
elif platform_lower == "youtube":
return YouTubePublisher(access_token=access_token)
elif platform_lower == "tiktok":
return TikTokPublisher(access_token=access_token)
else:
supported = "facebook, instagram, youtube, tiktok"
raise ValueError(
f"Unknown platform '{platform}'. Supported: {supported}"
)