From 709acd45a4b49ef3263b5cb1d1aa61ab6a566c2c Mon Sep 17 00:00:00 2001 From: Fanyang Meng Date: Mon, 17 Feb 2025 09:33:10 -0500 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Refactor=20=5F=5Finit=5F?= =?UTF-8?q?=5F.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ghost_mcp/tools/__init__.py | 118 +++++++++++++++----------- src/ghost_mcp/tools/posts.py | 2 +- src/ghost_mcp/tools/users.py | 143 -------------------------------- 3 files changed, 71 insertions(+), 192 deletions(-) diff --git a/src/ghost_mcp/tools/__init__.py b/src/ghost_mcp/tools/__init__.py index 45767c5..d0488c3 100644 --- a/src/ghost_mcp/tools/__init__.py +++ b/src/ghost_mcp/tools/__init__.py @@ -1,54 +1,76 @@ -"""Ghost MCP tools package. +from .invites import create_invite +from .members import list_members, update_member, read_member, create_member +from .newsletters import list_newsletters, read_newsletter, create_newsletter, update_newsletter +from .offers import list_offers, read_offer, create_offer, update_offer +from .posts import ( + list_posts, + search_posts_by_title, + read_post, + create_post, + update_post, + delete_post, + batchly_update_posts, +) +from .roles import list_roles +from .tags import browse_tags, read_tag, create_tag, update_tag, delete_tag +from .tiers import list_tiers, read_tier, create_tier, update_tier +from .users import list_users, read_user, delete_user +from .webhooks import create_webhook, update_webhook, delete_webhook -This module dynamically imports all tools from Python files in this directory. -It automatically discovers and imports all non-private functions and variables, -making them available at the package level. - -When adding new tools: -1. Create new Python files in this directory -2. Define your tools as functions in these files -3. No need to modify this file - tools will be imported automatically -""" - -from importlib import import_module -from pathlib import Path -from typing import Dict, Any - -def _import_submodules() -> Dict[str, Any]: - """Dynamically import all modules from the current package. +__all__ = [ + # Invites + "create_invite", - Returns: - Dict mapping module names to imported module objects + # Members + "list_members", + "read_member", + "create_member", + "update_member", - Raises: - FileNotFoundError: If the directory doesn't exist - """ - current_dir = Path(__file__).parent + # Newsletters + "list_newsletters", + "read_newsletter", + "create_newsletter", + "update_newsletter", - if not current_dir.exists(): - raise FileNotFoundError(f"Tools directory not found at: {current_dir}") + # Offers + "list_offers", + "read_offer", + "create_offer", + "update_offer", - modules: Dict[str, Any] = {} - for py_file in current_dir.glob('*.py'): - if py_file.name.startswith('__'): - continue - - module_name = py_file.stem - - # Import the module - module = import_module(f".{module_name}", package="ghost_mcp.tools") - modules[module_name] = module - - # Get all non-private attributes - for attr_name in dir(module): - if not attr_name.startswith('_'): - # Add to the current module's namespace - globals()[attr_name] = getattr(module, attr_name) + # Posts + "list_posts", + "search_posts_by_title", + "read_post", + "create_post", + "update_post", + "delete_post", + "batchly_update_posts", - return modules - -# Run the dynamic imports -_import_submodules() - -# Create sorted __all__ from the imported attributes for consistent ordering -__all__ = sorted(name for name in globals() if not name.startswith('_')) + # Roles + "list_roles", + + # Tags + "browse_tags", + "read_tag", + "create_tag", + "update_tag", + "delete_tag", + + # Tiers + "list_tiers", + "read_tier", + "create_tier", + "update_tier", + + # Users + "list_users", + "read_user", + "delete_user", + + # Webhooks + "create_webhook", + "update_webhook", + "delete_webhook", +] diff --git a/src/ghost_mcp/tools/posts.py b/src/ghost_mcp/tools/posts.py index 24a8042..f68e6c0 100644 --- a/src/ghost_mcp/tools/posts.py +++ b/src/ghost_mcp/tools/posts.py @@ -543,7 +543,7 @@ Updated At: {post.get('updated_at', 'Unknown')} ctx.error(f"Failed to update post: {str(e)}") return str(e) -async def batchly_update_post(filter_criteria: dict, update_data: dict, ctx: Context = None) -> str: +async def batchly_update_posts(filter_criteria: dict, update_data: dict, ctx: Context = None) -> str: """Update multiple blog posts that match the filter criteria. Args: diff --git a/src/ghost_mcp/tools/users.py b/src/ghost_mcp/tools/users.py index f878e3e..ebd70fd 100644 --- a/src/ghost_mcp/tools/users.py +++ b/src/ghost_mcp/tools/users.py @@ -136,149 +136,6 @@ Email: {user.get('email', 'Unknown')} ctx.error(f"Failed to delete user: {str(e)}") raise -async def update_user( - user_id: str, - name: str = None, - slug: str = None, - email: str = None, - profile_image: str = None, - cover_image: str = None, - bio: str = None, - website: str = None, - location: str = None, - facebook: str = None, - twitter: str = None, - meta_title: str = None, - meta_description: str = None, - accessibility: str = None, - comment_notifications: bool = None, - free_member_signup_notification: bool = None, - paid_subscription_started_notification: bool = None, - paid_subscription_canceled_notification: bool = None, - mention_notifications: bool = None, - milestone_notifications: bool = None, - ctx: Context = None -) -> str: - """Update an existing user in Ghost. - - Args: - user_id: ID of the user to update (required) - name: User's full name (optional) - slug: User's slug (optional) - email: User's email address (optional) - profile_image: URL for profile image (optional) - cover_image: URL for cover image (optional) - bio: User's bio (optional) - website: User's website URL (optional) - location: User's location (optional) - facebook: Facebook username (optional) - twitter: Twitter username (optional) - meta_title: Meta title for SEO (optional) - meta_description: Meta description for SEO (optional) - accessibility: Accessibility settings (optional) - comment_notifications: Enable comment notifications (optional) - free_member_signup_notification: Enable free member signup notifications (optional) - paid_subscription_started_notification: Enable paid subscription started notifications (optional) - paid_subscription_canceled_notification: Enable paid subscription canceled notifications (optional) - mention_notifications: Enable mention notifications (optional) - milestone_notifications: Enable milestone notifications (optional) - ctx: Optional context for logging - - Returns: - String representation of the updated user - - Raises: - GhostError: If the Ghost API request fails - ValueError: If no fields to update are provided - """ - # Check if at least one field to update is provided - update_fields = { - 'name': name, - 'slug': slug, - 'email': email, - 'profile_image': profile_image, - 'cover_image': cover_image, - 'bio': bio, - 'website': website, - 'location': location, - 'facebook': facebook, - 'twitter': twitter, - 'meta_title': meta_title, - 'meta_description': meta_description, - 'accessibility': accessibility, - 'comment_notifications': comment_notifications, - 'free_member_signup_notification': free_member_signup_notification, - 'paid_subscription_started_notification': paid_subscription_started_notification, - 'paid_subscription_canceled_notification': paid_subscription_canceled_notification, - 'mention_notifications': mention_notifications, - 'milestone_notifications': milestone_notifications - } - - if not any(v is not None for v in update_fields.values()): - raise ValueError("At least one field must be provided to update") - - if ctx: - ctx.info(f"Updating user with ID: {user_id}") - - # Construct update data with only provided fields - update_data = {"users": [{}]} - user_updates = update_data["users"][0] - - for field, value in update_fields.items(): - if value is not None: - user_updates[field] = value - - try: - if ctx: - ctx.debug("Getting auth headers") - headers = await get_auth_headers(STAFF_API_KEY) - - if ctx: - ctx.debug(f"Making API request to update user {user_id}") - response = await make_ghost_request( - f"users/{user_id}/", - headers, - ctx, - http_method="PUT", - json_data=update_data - ) - - if ctx: - ctx.debug("Processing updated user response") - - user = response.get("users", [{}])[0] - roles = [role.get('name') for role in user.get('roles', [])] - - return f""" -User updated successfully: -Name: {user.get('name', 'Unknown')} -Email: {user.get('email', 'Unknown')} -Slug: {user.get('slug', 'Unknown')} -Status: {user.get('status', 'Unknown')} -Roles: {', '.join(roles)} -Location: {user.get('location', 'Not specified')} -Website: {user.get('website', 'None')} -Bio: {user.get('bio', 'No bio')} -Profile Image: {user.get('profile_image', 'None')} -Cover Image: {user.get('cover_image', 'None')} -Facebook: {user.get('facebook', 'None')} -Twitter: {user.get('twitter', 'None')} -Created: {user.get('created_at', 'Unknown')} -Updated: {user.get('updated_at', 'Unknown')} -Last Seen: {user.get('last_seen', 'Never')} -Notifications: -- Comments: {user.get('comment_notifications', False)} -- Free Member Signup: {user.get('free_member_signup_notification', False)} -- Paid Subscription Started: {user.get('paid_subscription_started_notification', False)} -- Paid Subscription Canceled: {user.get('paid_subscription_canceled_notification', False)} -- Mentions: {user.get('mention_notifications', False)} -- Milestones: {user.get('milestone_notifications', False)} -""" - except Exception as e: - if ctx: - ctx.error(f"Failed to update user: {str(e)}") - raise - async def read_user(user_id: str, ctx: Context = None) -> str: """Get the details of a specific user.