""" Background scheduler for Leopost Full. Handles automated content generation and post publishing. """ import logging from datetime import datetime from .database import SessionLocal from .models import EditorialPlan, Post, ScheduledPost, SocialAccount, SystemSetting from .services.content import generate_post_text, generate_hashtags from .services.llm import get_llm_provider from .services.social import get_publisher logger = logging.getLogger("leopost.scheduler") def check_and_publish(): """Check for posts that need publishing and publish them.""" db = SessionLocal() try: now = datetime.utcnow() pending = ( db.query(ScheduledPost) .filter( ScheduledPost.status == "pending", ScheduledPost.scheduled_at <= now, ) .all() ) for sp in pending: try: _publish_single(sp, db) except Exception as e: logger.error(f"Failed to publish post {sp.id}: {e}") sp.status = "failed" sp.error_message = str(e) db.commit() finally: db.close() def _publish_single(sp: ScheduledPost, db): """Publish a single scheduled post.""" post = db.query(Post).filter(Post.id == sp.post_id).first() if not post: sp.status = "failed" sp.error_message = "Post not found" db.commit() return # Find social account for this character + platform account = ( db.query(SocialAccount) .filter( SocialAccount.character_id == post.character_id, SocialAccount.platform == sp.platform, SocialAccount.is_active == True, ) .first() ) if not account: sp.status = "failed" sp.error_message = f"No active {sp.platform} account found" db.commit() return sp.status = "publishing" db.commit() try: kwargs = {} if account.page_id: kwargs["page_id"] = account.page_id if hasattr(account, "extra_data") and account.extra_data: kwargs.update(account.extra_data) publisher = get_publisher(sp.platform, account.access_token, **kwargs) if post.video_url: ext_id = publisher.publish_video(post.text_content or "", post.video_url) elif post.image_url: ext_id = publisher.publish_image(post.text_content or "", post.image_url) else: text = post.text_content or "" if post.hashtags: text += "\n\n" + " ".join(post.hashtags) ext_id = publisher.publish_text(text) sp.status = "published" sp.published_at = datetime.utcnow() sp.external_post_id = ext_id post.status = "published" db.commit() except Exception as e: sp.status = "failed" sp.error_message = str(e) db.commit() raise def generate_planned_content(): """Generate content for active editorial plans.""" db = SessionLocal() try: plans = ( db.query(EditorialPlan) .filter(EditorialPlan.is_active == True) .all() ) # Get LLM settings llm_setting = _get_setting(db, "llm_provider", "claude") api_key_setting = _get_setting(db, "llm_api_key", "") model_setting = _get_setting(db, "llm_model", None) if not api_key_setting: logger.warning("No LLM API key configured, skipping content generation") return for plan in plans: try: _generate_for_plan(plan, db, llm_setting, api_key_setting, model_setting) except Exception as e: logger.error(f"Failed to generate for plan {plan.id}: {e}") finally: db.close() def _generate_for_plan(plan, db, provider_name, api_key, model): """Generate content for a single plan.""" from .models import Character character = db.query(Character).filter(Character.id == plan.character_id).first() if not character: return provider = get_llm_provider(provider_name, api_key, model) char_dict = { "name": character.name, "niche": character.niche, "topics": character.topics or [], "tone": character.tone or "", } for platform in plan.platforms or []: for content_type in plan.content_types or ["text"]: text = generate_post_text(char_dict, provider, platform) hashtags = generate_hashtags(text, provider, platform) post = Post( character_id=character.id, content_type=content_type, text_content=text, hashtags=hashtags, llm_provider=provider_name, llm_model=model, platform_hint=platform, status="draft", ) db.add(post) db.commit() def _get_setting(db, key, default=None): """Get a system setting value.""" setting = db.query(SystemSetting).filter(SystemSetting.key == key).first() if setting: return setting.value return default