From f34f9c67ed39b9a2e409e4f5677f573565c50bde Mon Sep 17 00:00:00 2001 From: Fanyang Meng Date: Tue, 11 Feb 2025 18:13:27 -0500 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Support=20updating=20a=20post?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ghost_mcp/api.py | 13 ++- src/ghost_mcp/server.py | 13 +-- src/ghost_mcp/tools.py | 230 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 236 insertions(+), 20 deletions(-) diff --git a/src/ghost_mcp/api.py b/src/ghost_mcp/api.py index 37b9698..b75b0b4 100644 --- a/src/ghost_mcp/api.py +++ b/src/ghost_mcp/api.py @@ -63,7 +63,9 @@ async def make_ghost_request( endpoint: str, headers: Dict[str, str], ctx: Context = None, - is_resource: bool = False + is_resource: bool = False, + http_method: str = "GET", + json_data: Dict[str, Any] = None ) -> Dict[str, Any]: """Make an authenticated request to the Ghost API. @@ -72,6 +74,8 @@ async def make_ghost_request( headers: Request headers ctx: Optional context for logging (not used for resources) is_resource: Whether this request is for a resource + http_method: HTTP method to use (GET or PUT) + json_data: JSON data to send with PUT requests Returns: Parsed JSON response @@ -82,11 +86,14 @@ async def make_ghost_request( # Ensure clean URL construction with proper trailing slashes base_url = f"{API_URL.rstrip('/')}/ghost/api/admin" endpoint = endpoint.strip('/') - url = f"{base_url}/{endpoint}/" + url = f"{base_url}/{endpoint}" # Remove trailing slash for PUT requests to work correctly async with httpx.AsyncClient(follow_redirects=True) as client: try: - response = await client.get(url, headers=headers) + if http_method == "PUT": + response = await client.put(url, headers=headers, json=json_data) + else: # Default to GET + response = await client.get(url, headers=headers) response.raise_for_status() if not is_resource and ctx: ctx.log("info", f"API Request to {url} successful") diff --git a/src/ghost_mcp/server.py b/src/ghost_mcp/server.py index cdbd2cb..27f1968 100644 --- a/src/ghost_mcp/server.py +++ b/src/ghost_mcp/server.py @@ -41,19 +41,20 @@ def create_server() -> FastMCP: mcp.resource("post://{post_id}")(resources.handle_post_resource) mcp.resource("blog://info")(resources.handle_blog_info) - # Register tools - mcp.tool()(tools.list_users) - mcp.tool()(tools.list_members) - mcp.tool()(tools.list_tiers) - mcp.tool()(tools.list_offers) - mcp.tool()(tools.list_newsletters) + # Register tools with preprocessing for create_post mcp.tool()(tools.list_posts) mcp.tool()(tools.read_post) + mcp.tool()(tools.update_post) mcp.tool()(tools.search_posts_by_title) + mcp.tool()(tools.list_users) mcp.tool()(tools.read_user) + mcp.tool()(tools.list_members) mcp.tool()(tools.read_member) + mcp.tool()(tools.list_tiers) mcp.tool()(tools.read_tier) + mcp.tool()(tools.list_offers) mcp.tool()(tools.read_offer) + mcp.tool()(tools.list_newsletters) mcp.tool()(tools.read_newsletter) # Register prompts diff --git a/src/ghost_mcp/tools.py b/src/ghost_mcp/tools.py index fae8c3f..1186325 100644 --- a/src/ghost_mcp/tools.py +++ b/src/ghost_mcp/tools.py @@ -683,14 +683,21 @@ ID: {post.get('id', 'Unknown')} return str(e) async def read_post(post_id: str, ctx: Context = None) -> str: - """Get the full content of a specific blog post. + """Get the full content and metadata of a specific blog post. Args: post_id: The ID of the post to retrieve ctx: Optional context for logging Returns: - Formatted string containing the full post content + Formatted string containing all post details including: + - Basic info (title, slug, status, etc) + - Content in both HTML and Lexical formats + - Feature image details + - Meta fields (SEO, Open Graph, Twitter) + - Authors and tags + - Email settings + - Timestamps Raises: GhostError: If there is an error accessing the Ghost API @@ -706,7 +713,7 @@ async def read_post(post_id: str, ctx: Context = None) -> str: if ctx: ctx.debug(f"Making API request to /posts/{post_id}/") data = await make_ghost_request( - f"posts/{post_id}/?formats=html,plaintext&include=html,plaintext", + f"posts/{post_id}/?formats=html,lexical&include=tags,authors", headers, ctx ) @@ -715,22 +722,223 @@ async def read_post(post_id: str, ctx: Context = None) -> str: ctx.debug("Processing post response data") post = data["posts"][0] - content = post.get('html') or post.get('plaintext') or 'No content available' + + # Format tags and authors + tags = [tag.get('name', 'Unknown') for tag in post.get('tags', [])] + authors = [author.get('name', 'Unknown') for author in post.get('authors', [])] + + # Get content + html_content = post.get('html', 'No HTML content available') + lexical_content = post.get('lexical', 'No Lexical content available') return f""" - Title: {post.get('title', 'Untitled')} - Status: {post.get('status', 'Unknown')} - URL: {post.get('url', 'No URL')} - Created: {post.get('created_at', 'Unknown')} +Post Details: - Content: - {content} - """ +Basic Information: +Title: {post.get('title', 'Untitled')} +Slug: {post.get('slug', 'No slug')} +Status: {post.get('status', 'Unknown')} +Visibility: {post.get('visibility', 'Unknown')} +Featured: {post.get('featured', False)} +URL: {post.get('url', 'No URL')} + +Content Formats: +HTML Content: +{html_content} + +Lexical Content: +{lexical_content} + +Images: +Feature Image: {post.get('feature_image', 'None')} +Feature Image Alt: {post.get('feature_image_alt', 'None')} +Feature Image Caption: {post.get('feature_image_caption', 'None')} + +Meta Information: +Meta Title: {post.get('meta_title', 'None')} +Meta Description: {post.get('meta_description', 'None')} +Canonical URL: {post.get('canonical_url', 'None')} +Custom Excerpt: {post.get('custom_excerpt', 'None')} + +Open Graph: +OG Image: {post.get('og_image', 'None')} +OG Title: {post.get('og_title', 'None')} +OG Description: {post.get('og_description', 'None')} + +Twitter Card: +Twitter Image: {post.get('twitter_image', 'None')} +Twitter Title: {post.get('twitter_title', 'None')} +Twitter Description: {post.get('twitter_description', 'None')} + +Code Injection: +Header Code: {post.get('codeinjection_head', 'None')} +Footer Code: {post.get('codeinjection_foot', 'None')} + +Template: +Custom Template: {post.get('custom_template', 'None')} + +Relationships: +Tags: {', '.join(tags) if tags else 'None'} +Authors: {', '.join(authors) if authors else 'None'} + +Email Settings: +Email Only: {post.get('email_only', False)} +Email Subject: {post.get('email', {}).get('subject', 'None')} + +Timestamps: +Created: {post.get('created_at', 'Unknown')} +Updated: {post.get('updated_at', 'Unknown')} +Published: {post.get('published_at', 'Not published')} + +System IDs: +ID: {post.get('id', 'Unknown')} +UUID: {post.get('uuid', 'Unknown')} +""" except GhostError as e: if ctx: ctx.error(f"Failed to read post: {str(e)}") return str(e) +async def update_post(post_id: str, update_data: dict, ctx: Context = None) -> str: + """Update a blog post using lexical content. + + Args: + post_id: The ID of the post to update + update_data: Dictionary containing the lexical content and updated_at timestamp. + Expected to have at least 'lexical' and 'updated_at' keys. + The lexical content must be a properly escaped JSON string in this format: + { + "root": { + "children": [ + { + "children": [ + { + "detail": 0, + "format": 0, + "mode": "normal", + "style": "", + "text": "Your content here", + "type": "text", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "paragraph", + "version": 1 + } + ], + "direction": "ltr", + "format": "", + "indent": 0, + "type": "root", + "version": 1 + } + } + + Example usage: + update_data = { + "post_id": "67abcffb7f82ac000179d76f", + "update_data": { + "lexical": "{\"root\":{\"children\":[{\"children\":[{\"detail\":0,\"format\":0,\"mode\":\"normal\",\"style\":\"\",\"text\":\"Hello World\",\"type\":\"text\",\"version\":1}],\"direction\":\"ltr\",\"format\":\"\",\"indent\":0,\"type\":\"paragraph\",\"version\":1}],\"direction\":\"ltr\",\"format\":\"\",\"indent\":0,\"type\":\"root\",\"version\":1}}", + "updated_at": "2025-02-11T22:54:40.000Z" + } + } + ctx: Optional context for logging + + Returns: + Formatted string containing the updated post details + + Raises: + GhostError: If there is an error accessing the Ghost API or missing required fields + """ + if ctx: + ctx.info(f"Updating post with ID: {post_id}") + + if 'updated_at' not in update_data: + error_msg = "updated_at field is required for post updates" + if ctx: + ctx.error(error_msg) + return error_msg + + if 'lexical' not in update_data: + error_msg = "lexical field is required for post updates" + if ctx: + ctx.error(error_msg) + return error_msg + + try: + if ctx: + ctx.debug("Getting auth headers") + headers = await get_auth_headers(STAFF_API_KEY) + + # Prepare update payload with lexical content + post_update = { + "posts": [{ + "id": post_id, + "updated_at": update_data["updated_at"], + "lexical": update_data["lexical"] + }] + } + + # Optionally handle tag updates if provided + import copy + update_fields = copy.deepcopy(update_data) + if "tags" in update_fields: + tags = update_fields.pop("tags") + post_update["posts"][0]["tags"] = [{"name": tag} if isinstance(tag, str) else tag for tag in tags] + + # Copy any additional fields + for key, value in update_fields.items(): + if key not in ["updated_at", "lexical"]: + post_update["posts"][0][key] = value + + if ctx: + ctx.debug(f"Update payload: {json.dumps(post_update, indent=2)}") + + if ctx: + ctx.debug(f"Making PUT request to /posts/{post_id}/") + data = await make_ghost_request( + f"posts/{post_id}/?formats=lexical&include=tags,authors", + headers, + ctx, + http_method="PUT", + json_data=post_update + ) + + if ctx: + ctx.debug(f"API Response: {json.dumps(data, indent=2)}") + + post = data["posts"][0] + + # Format tags and authors for display + tags = [tag.get('name', 'Unknown') for tag in post.get('tags', [])] + authors = [author.get('name', 'Unknown') for author in post.get('authors', [])] + + # Get content preview (lexical content) + lexical_content = post.get('lexical', '') + content_preview = lexical_content[:200] + "..." if lexical_content else 'No content available' + + return f""" +Post Updated Successfully: +Title: {post.get('title', 'Untitled')} +Slug: {post.get('slug', 'No slug')} +Status: {post.get('status', 'Unknown')} +Visibility: {post.get('visibility', 'Unknown')} +Featured: {post.get('featured', False)} +URL: {post.get('url', 'No URL')} +Tags: {', '.join(tags) if tags else 'None'} +Authors: {', '.join(authors) if authors else 'None'} +Published At: {post.get('published_at', 'Not published')} +Updated At: {post.get('updated_at', 'Unknown')} +Content Preview: {content_preview} +""" + except GhostError as e: + if ctx: + ctx.error(f"Failed to update post: {str(e)}") + return str(e) + async def search_posts_by_title(query: str, exact: bool = False, ctx: Context = None) -> str: """Search for posts by title.