""" Social media publishing abstraction layer. Supports Facebook, Instagram, YouTube, and TikTok via their respective APIs. All HTTP calls use httpx (sync). """ from __future__ import annotations import time from abc import ABC, abstractmethod import httpx TIMEOUT = 120.0 UPLOAD_TIMEOUT = 300.0 class SocialPublisher(ABC): """Abstract base class for social media publishers.""" @abstractmethod def publish_text(self, text: str, **kwargs) -> str: """Publish a text-only post. Returns: External post ID from the platform. """ ... @abstractmethod def publish_image(self, text: str, image_url: str, **kwargs) -> str: """Publish an image post with caption. Returns: External post ID from the platform. """ ... @abstractmethod def publish_video(self, text: str, video_path: str, **kwargs) -> str: """Publish a video post with caption. Returns: External post ID from the platform. """ ... @abstractmethod def get_comments(self, post_id: str) -> list[dict]: """Get comments on a post. Returns: List of comment dicts with at least 'id', 'text', 'author' keys. """ ... @abstractmethod def reply_to_comment(self, comment_id: str, text: str) -> bool: """Reply to a specific comment. Returns: True if reply was successful, False otherwise. """ ... class FacebookPublisher(SocialPublisher): """Facebook Page publishing via Graph API. Required API setup: - Create a Facebook App at https://developers.facebook.com - Request 'pages_manage_posts', 'pages_read_engagement' permissions - Get a Page Access Token (long-lived recommended) - The page_id is the Facebook Page ID (numeric) """ GRAPH_API_BASE = "https://graph.facebook.com/v18.0" def __init__(self, access_token: str, page_id: str): self.access_token = access_token self.page_id = page_id def _request( self, method: str, endpoint: str, params: dict | None = None, data: dict | None = None, files: dict | None = None, timeout: float = TIMEOUT, ) -> dict: """Make a request to the Facebook Graph API.""" url = f"{self.GRAPH_API_BASE}{endpoint}" if params is None: params = {} params["access_token"] = self.access_token try: with httpx.Client(timeout=timeout) as client: if method == "GET": response = client.get(url, params=params) elif files: response = client.post(url, params=params, data=data, files=files) else: response = client.post(url, params=params, json=data) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise RuntimeError( f"Facebook API error {e.response.status_code}: {e.response.text}" ) from e except httpx.RequestError as e: raise RuntimeError(f"Facebook API request failed: {e}") from e def publish_text(self, text: str, **kwargs) -> str: data = {"message": text} result = self._request("POST", f"/{self.page_id}/feed", data=data) return result.get("id", "") def publish_image(self, text: str, image_url: str, **kwargs) -> str: data = { "message": text, "url": image_url, } result = self._request("POST", f"/{self.page_id}/photos", data=data) return result.get("id", "") def publish_video(self, text: str, video_path: str, **kwargs) -> str: with open(video_path, "rb") as video_file: files = {"source": ("video.mp4", video_file, "video/mp4")} form_data = {"description": text} result = self._request( "POST", f"/{self.page_id}/videos", data=form_data, files=files, timeout=UPLOAD_TIMEOUT, ) return result.get("id", "") def get_comments(self, post_id: str) -> list[dict]: result = self._request("GET", f"/{post_id}/comments") comments = [] for item in result.get("data", []): comments.append({ "id": item.get("id", ""), "text": item.get("message", ""), "author": item.get("from", {}).get("name", "Unknown"), "created_at": item.get("created_time", ""), }) return comments def reply_to_comment(self, comment_id: str, text: str) -> bool: try: self._request("POST", f"/{comment_id}/comments", data={"message": text}) return True except RuntimeError: return False class InstagramPublisher(SocialPublisher): """Instagram publishing via Instagram Graph API (Business/Creator accounts). Required API setup: - Facebook App with Instagram Graph API enabled - Instagram Business or Creator account linked to a Facebook Page - Permissions: 'instagram_basic', 'instagram_content_publish' - ig_user_id is the Instagram Business Account ID (from Facebook Graph API) - Note: Text-only posts are not supported by Instagram API """ GRAPH_API_BASE = "https://graph.facebook.com/v18.0" def __init__(self, access_token: str, ig_user_id: str): self.access_token = access_token self.ig_user_id = ig_user_id def _request( self, method: str, endpoint: str, params: dict | None = None, data: dict | None = None, timeout: float = TIMEOUT, ) -> dict: """Make a request to the Instagram Graph API.""" url = f"{self.GRAPH_API_BASE}{endpoint}" if params is None: params = {} params["access_token"] = self.access_token try: with httpx.Client(timeout=timeout) as client: if method == "GET": response = client.get(url, params=params) else: response = client.post(url, params=params, json=data) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise RuntimeError( f"Instagram API error {e.response.status_code}: {e.response.text}" ) from e except httpx.RequestError as e: raise RuntimeError(f"Instagram API request failed: {e}") from e def publish_text(self, text: str, **kwargs) -> str: """Instagram does not support text-only posts. Raises RuntimeError. Use publish_image or publish_video instead. """ raise RuntimeError( "Instagram does not support text-only posts. " "Use publish_image() or publish_video() instead." ) def publish_image(self, text: str, image_url: str, **kwargs) -> str: # Step 1: Create media container container_data = { "image_url": image_url, "caption": text, } container = self._request( "POST", f"/{self.ig_user_id}/media", data=container_data ) container_id = container.get("id", "") if not container_id: raise RuntimeError("Failed to create Instagram media container") # Step 2: Publish the container publish_data = {"creation_id": container_id} result = self._request( "POST", f"/{self.ig_user_id}/media_publish", data=publish_data ) return result.get("id", "") def publish_video(self, text: str, video_path: str, **kwargs) -> str: """Publish a video as a Reel on Instagram. Note: video_path should be a publicly accessible URL for Instagram API. For local files, upload to a hosting service first and pass the URL. """ video_url = kwargs.get("video_url", video_path) # Step 1: Create media container for Reel container_data = { "media_type": "REELS", "video_url": video_url, "caption": text, } container = self._request( "POST", f"/{self.ig_user_id}/media", data=container_data ) container_id = container.get("id", "") if not container_id: raise RuntimeError("Failed to create Instagram video container") # Step 2: Wait for video processing (poll status) for _ in range(60): status = self._request( "GET", f"/{container_id}", params={"fields": "status_code"}, ) status_code = status.get("status_code", "") if status_code == "FINISHED": break if status_code == "ERROR": raise RuntimeError("Instagram video processing failed") time.sleep(5) else: raise RuntimeError("Instagram video processing timed out") # Step 3: Publish publish_data = {"creation_id": container_id} result = self._request( "POST", f"/{self.ig_user_id}/media_publish", data=publish_data ) return result.get("id", "") def get_comments(self, post_id: str) -> list[dict]: result = self._request( "GET", f"/{post_id}/comments", params={"fields": "id,text,username,timestamp"}, ) comments = [] for item in result.get("data", []): comments.append({ "id": item.get("id", ""), "text": item.get("text", ""), "author": item.get("username", "Unknown"), "created_at": item.get("timestamp", ""), }) return comments def reply_to_comment(self, comment_id: str, text: str) -> bool: try: self._request( "POST", f"/{comment_id}/replies", data={"message": text}, ) return True except RuntimeError: return False class YouTubePublisher(SocialPublisher): """YouTube publishing via YouTube Data API v3. Required API setup: - Google Cloud project with YouTube Data API v3 enabled - OAuth 2.0 credentials (access_token from OAuth flow) - Scopes: 'https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube.force-ssl' - Note: Uploads require OAuth, not just an API key """ API_BASE = "https://www.googleapis.com" UPLOAD_URL = "https://www.googleapis.com/upload/youtube/v3/videos" def __init__(self, access_token: str): self.access_token = access_token def _headers(self) -> dict: return {"Authorization": f"Bearer {self.access_token}"} def publish_text(self, text: str, **kwargs) -> str: """YouTube does not support text-only posts via Data API. Consider using YouTube Community Posts API if available. """ raise RuntimeError( "YouTube does not support text-only posts via the Data API. " "Use publish_video() instead." ) def publish_image(self, text: str, image_url: str, **kwargs) -> str: """YouTube does not support image-only posts via Data API.""" raise RuntimeError( "YouTube does not support image posts via the Data API. " "Use publish_video() instead." ) def publish_video(self, text: str, video_path: str, **kwargs) -> str: """Upload a video to YouTube using resumable upload. Args: text: Video description. video_path: Path to the video file. **kwargs: Additional options: - title (str): Video title (default: first 100 chars of text) - tags (list[str]): Video tags - privacy (str): 'public', 'unlisted', or 'private' (default: 'public') - category_id (str): YouTube category ID (default: '22' for People & Blogs) """ title = kwargs.get("title", text[:100]) tags = kwargs.get("tags", []) privacy = kwargs.get("privacy", "public") category_id = kwargs.get("category_id", "22") # Step 1: Initialize resumable upload metadata = { "snippet": { "title": title, "description": text, "tags": tags, "categoryId": category_id, }, "status": { "privacyStatus": privacy, }, } headers = self._headers() headers["Content-Type"] = "application/json" try: with httpx.Client(timeout=UPLOAD_TIMEOUT) as client: # Init resumable upload init_response = client.post( self.UPLOAD_URL, params={ "uploadType": "resumable", "part": "snippet,status", }, headers=headers, json=metadata, ) init_response.raise_for_status() upload_url = init_response.headers.get("location") if not upload_url: raise RuntimeError( "YouTube API did not return a resumable upload URL" ) # Step 2: Upload the video file with open(video_path, "rb") as video_file: video_data = video_file.read() upload_headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "video/*", "Content-Length": str(len(video_data)), } upload_response = client.put( upload_url, headers=upload_headers, content=video_data, ) upload_response.raise_for_status() result = upload_response.json() return result.get("id", "") except httpx.HTTPStatusError as e: raise RuntimeError( f"YouTube API error {e.response.status_code}: {e.response.text}" ) from e except httpx.RequestError as e: raise RuntimeError(f"YouTube API request failed: {e}") from e def get_comments(self, post_id: str) -> list[dict]: """Get comment threads for a video. Args: post_id: YouTube video ID. """ url = f"{self.API_BASE}/youtube/v3/commentThreads" params = { "part": "snippet", "videoId": post_id, "maxResults": 100, "order": "time", } try: with httpx.Client(timeout=TIMEOUT) as client: response = client.get(url, headers=self._headers(), params=params) response.raise_for_status() data = response.json() except httpx.HTTPStatusError as e: raise RuntimeError( f"YouTube API error {e.response.status_code}: {e.response.text}" ) from e except httpx.RequestError as e: raise RuntimeError(f"YouTube API request failed: {e}") from e comments = [] for item in data.get("items", []): snippet = item.get("snippet", {}).get("topLevelComment", {}).get("snippet", {}) comments.append({ "id": item.get("snippet", {}).get("topLevelComment", {}).get("id", ""), "text": snippet.get("textDisplay", ""), "author": snippet.get("authorDisplayName", "Unknown"), "created_at": snippet.get("publishedAt", ""), }) return comments def reply_to_comment(self, comment_id: str, text: str) -> bool: """Reply to a YouTube comment. Args: comment_id: The parent comment ID to reply to. text: Reply text. """ url = f"{self.API_BASE}/youtube/v3/comments" params = {"part": "snippet"} payload = { "snippet": { "parentId": comment_id, "textOriginal": text, } } try: with httpx.Client(timeout=TIMEOUT) as client: response = client.post( url, headers=self._headers(), params=params, json=payload ) response.raise_for_status() return True except (httpx.HTTPStatusError, httpx.RequestError): return False class TikTokPublisher(SocialPublisher): """TikTok publishing via Content Posting API. Required API setup: - Register app at https://developers.tiktok.com - Apply for 'Content Posting API' access - OAuth 2.0 flow to get access_token - Scopes: 'video.publish', 'video.upload' - Note: TikTok API access requires app review and approval - Text-only and image posts are not supported via API """ API_BASE = "https://open.tiktokapis.com/v2" def __init__(self, access_token: str): self.access_token = access_token def _headers(self) -> dict: return { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", } def publish_text(self, text: str, **kwargs) -> str: """TikTok does not support text-only posts via API.""" raise RuntimeError( "TikTok does not support text-only posts via the Content Posting API. " "Use publish_video() instead." ) def publish_image(self, text: str, image_url: str, **kwargs) -> str: """TikTok image posting is limited. Use publish_video instead.""" raise RuntimeError( "TikTok image posting is not widely supported via the API. " "Use publish_video() instead." ) def publish_video(self, text: str, video_path: str, **kwargs) -> str: """Publish a video to TikTok using the Content Posting API. Args: text: Video caption/description. video_path: Path to the video file. **kwargs: Additional options: - privacy_level (str): 'PUBLIC_TO_EVERYONE', 'MUTUAL_FOLLOW_FRIENDS', 'FOLLOWER_OF_CREATOR', 'SELF_ONLY' (default: 'PUBLIC_TO_EVERYONE') - disable_comment (bool): Disable comments (default: False) - disable_duet (bool): Disable duet (default: False) - disable_stitch (bool): Disable stitch (default: False) """ privacy_level = kwargs.get("privacy_level", "PUBLIC_TO_EVERYONE") disable_comment = kwargs.get("disable_comment", False) disable_duet = kwargs.get("disable_duet", False) disable_stitch = kwargs.get("disable_stitch", False) # Get file size for chunk upload import os file_size = os.path.getsize(video_path) # Step 1: Initialize video upload init_url = f"{self.API_BASE}/post/publish/video/init/" init_payload = { "post_info": { "title": text[:150], # TikTok title limit "privacy_level": privacy_level, "disable_comment": disable_comment, "disable_duet": disable_duet, "disable_stitch": disable_stitch, }, "source_info": { "source": "FILE_UPLOAD", "video_size": file_size, "chunk_size": file_size, # Single chunk upload "total_chunk_count": 1, }, } try: with httpx.Client(timeout=UPLOAD_TIMEOUT) as client: init_response = client.post( init_url, headers=self._headers(), json=init_payload ) init_response.raise_for_status() init_data = init_response.json() publish_id = init_data.get("data", {}).get("publish_id", "") upload_url = init_data.get("data", {}).get("upload_url", "") if not upload_url: raise RuntimeError( f"TikTok API did not return upload URL: {init_data}" ) # Step 2: Upload the video file with open(video_path, "rb") as video_file: video_data = video_file.read() upload_headers = { "Content-Type": "video/mp4", "Content-Range": f"bytes 0-{file_size - 1}/{file_size}", } upload_response = client.put( upload_url, headers=upload_headers, content=video_data, ) upload_response.raise_for_status() return publish_id except httpx.HTTPStatusError as e: raise RuntimeError( f"TikTok API error {e.response.status_code}: {e.response.text}" ) from e except httpx.RequestError as e: raise RuntimeError(f"TikTok API request failed: {e}") from e def get_comments(self, post_id: str) -> list[dict]: """Get comments on a TikTok video. Note: Requires 'video.list' scope and comment read access. """ url = f"{self.API_BASE}/comment/list/" payload = { "video_id": post_id, "max_count": 50, } try: with httpx.Client(timeout=TIMEOUT) as client: response = client.post(url, headers=self._headers(), json=payload) response.raise_for_status() data = response.json() except httpx.HTTPStatusError as e: raise RuntimeError( f"TikTok API error {e.response.status_code}: {e.response.text}" ) from e except httpx.RequestError as e: raise RuntimeError(f"TikTok API request failed: {e}") from e comments = [] for item in data.get("data", {}).get("comments", []): comments.append({ "id": item.get("id", ""), "text": item.get("text", ""), "author": item.get("user", {}).get("display_name", "Unknown"), "created_at": str(item.get("create_time", "")), }) return comments def reply_to_comment(self, comment_id: str, text: str) -> bool: """Reply to a TikTok comment. Note: Comment reply functionality may be limited depending on API access level. """ url = f"{self.API_BASE}/comment/reply/" payload = { "comment_id": comment_id, "text": text, } try: with httpx.Client(timeout=TIMEOUT) as client: response = client.post(url, headers=self._headers(), json=payload) response.raise_for_status() return True except (httpx.HTTPStatusError, httpx.RequestError): return False def get_publisher( platform: str, access_token: str, **kwargs ) -> SocialPublisher: """Factory function to get a social media publisher instance. Args: platform: One of 'facebook', 'instagram', 'youtube', 'tiktok'. access_token: OAuth access token for the platform. **kwargs: Additional platform-specific arguments: - facebook: page_id (str) - required - instagram: ig_user_id (str) - required Returns: A SocialPublisher instance. Raises: ValueError: If platform is not supported or required kwargs are missing. """ platform_lower = platform.lower() if platform_lower == "facebook": page_id = kwargs.get("page_id") if not page_id: raise ValueError("FacebookPublisher requires 'page_id' parameter") return FacebookPublisher(access_token=access_token, page_id=page_id) elif platform_lower == "instagram": ig_user_id = kwargs.get("ig_user_id") if not ig_user_id: raise ValueError("InstagramPublisher requires 'ig_user_id' parameter") return InstagramPublisher(access_token=access_token, ig_user_id=ig_user_id) elif platform_lower == "youtube": return YouTubePublisher(access_token=access_token) elif platform_lower == "tiktok": return TikTokPublisher(access_token=access_token) else: supported = "facebook, instagram, youtube, tiktok" raise ValueError( f"Unknown platform '{platform}'. Supported: {supported}" )