Support updating a post

This commit is contained in:
Fanyang Meng
2025-02-11 18:13:27 -05:00
parent a78898844f
commit f34f9c67ed
3 changed files with 236 additions and 20 deletions

View File

@@ -63,7 +63,9 @@ async def make_ghost_request(
endpoint: str, endpoint: str,
headers: Dict[str, str], headers: Dict[str, str],
ctx: Context = None, ctx: Context = None,
is_resource: bool = False is_resource: bool = False,
http_method: str = "GET",
json_data: Dict[str, Any] = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Make an authenticated request to the Ghost API. """Make an authenticated request to the Ghost API.
@@ -72,6 +74,8 @@ async def make_ghost_request(
headers: Request headers headers: Request headers
ctx: Optional context for logging (not used for resources) ctx: Optional context for logging (not used for resources)
is_resource: Whether this request is for a resource is_resource: Whether this request is for a resource
http_method: HTTP method to use (GET or PUT)
json_data: JSON data to send with PUT requests
Returns: Returns:
Parsed JSON response Parsed JSON response
@@ -82,10 +86,13 @@ async def make_ghost_request(
# Ensure clean URL construction with proper trailing slashes # Ensure clean URL construction with proper trailing slashes
base_url = f"{API_URL.rstrip('/')}/ghost/api/admin" base_url = f"{API_URL.rstrip('/')}/ghost/api/admin"
endpoint = endpoint.strip('/') endpoint = endpoint.strip('/')
url = f"{base_url}/{endpoint}/" url = f"{base_url}/{endpoint}" # Remove trailing slash for PUT requests to work correctly
async with httpx.AsyncClient(follow_redirects=True) as client: async with httpx.AsyncClient(follow_redirects=True) as client:
try: try:
if http_method == "PUT":
response = await client.put(url, headers=headers, json=json_data)
else: # Default to GET
response = await client.get(url, headers=headers) response = await client.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
if not is_resource and ctx: if not is_resource and ctx:

View File

@@ -41,19 +41,20 @@ def create_server() -> FastMCP:
mcp.resource("post://{post_id}")(resources.handle_post_resource) mcp.resource("post://{post_id}")(resources.handle_post_resource)
mcp.resource("blog://info")(resources.handle_blog_info) mcp.resource("blog://info")(resources.handle_blog_info)
# Register tools # Register tools with preprocessing for create_post
mcp.tool()(tools.list_users)
mcp.tool()(tools.list_members)
mcp.tool()(tools.list_tiers)
mcp.tool()(tools.list_offers)
mcp.tool()(tools.list_newsletters)
mcp.tool()(tools.list_posts) mcp.tool()(tools.list_posts)
mcp.tool()(tools.read_post) mcp.tool()(tools.read_post)
mcp.tool()(tools.update_post)
mcp.tool()(tools.search_posts_by_title) mcp.tool()(tools.search_posts_by_title)
mcp.tool()(tools.list_users)
mcp.tool()(tools.read_user) mcp.tool()(tools.read_user)
mcp.tool()(tools.list_members)
mcp.tool()(tools.read_member) mcp.tool()(tools.read_member)
mcp.tool()(tools.list_tiers)
mcp.tool()(tools.read_tier) mcp.tool()(tools.read_tier)
mcp.tool()(tools.list_offers)
mcp.tool()(tools.read_offer) mcp.tool()(tools.read_offer)
mcp.tool()(tools.list_newsletters)
mcp.tool()(tools.read_newsletter) mcp.tool()(tools.read_newsletter)
# Register prompts # Register prompts

View File

@@ -683,14 +683,21 @@ ID: {post.get('id', 'Unknown')}
return str(e) return str(e)
async def read_post(post_id: str, ctx: Context = None) -> str: async def read_post(post_id: str, ctx: Context = None) -> str:
"""Get the full content of a specific blog post. """Get the full content and metadata of a specific blog post.
Args: Args:
post_id: The ID of the post to retrieve post_id: The ID of the post to retrieve
ctx: Optional context for logging ctx: Optional context for logging
Returns: Returns:
Formatted string containing the full post content Formatted string containing all post details including:
- Basic info (title, slug, status, etc)
- Content in both HTML and Lexical formats
- Feature image details
- Meta fields (SEO, Open Graph, Twitter)
- Authors and tags
- Email settings
- Timestamps
Raises: Raises:
GhostError: If there is an error accessing the Ghost API GhostError: If there is an error accessing the Ghost API
@@ -706,7 +713,7 @@ async def read_post(post_id: str, ctx: Context = None) -> str:
if ctx: if ctx:
ctx.debug(f"Making API request to /posts/{post_id}/") ctx.debug(f"Making API request to /posts/{post_id}/")
data = await make_ghost_request( data = await make_ghost_request(
f"posts/{post_id}/?formats=html,plaintext&include=html,plaintext", f"posts/{post_id}/?formats=html,lexical&include=tags,authors",
headers, headers,
ctx ctx
) )
@@ -715,22 +722,223 @@ async def read_post(post_id: str, ctx: Context = None) -> str:
ctx.debug("Processing post response data") ctx.debug("Processing post response data")
post = data["posts"][0] post = data["posts"][0]
content = post.get('html') or post.get('plaintext') or 'No content available'
# Format tags and authors
tags = [tag.get('name', 'Unknown') for tag in post.get('tags', [])]
authors = [author.get('name', 'Unknown') for author in post.get('authors', [])]
# Get content
html_content = post.get('html', 'No HTML content available')
lexical_content = post.get('lexical', 'No Lexical content available')
return f""" return f"""
Title: {post.get('title', 'Untitled')} Post Details:
Status: {post.get('status', 'Unknown')}
URL: {post.get('url', 'No URL')}
Created: {post.get('created_at', 'Unknown')}
Content: Basic Information:
{content} Title: {post.get('title', 'Untitled')}
Slug: {post.get('slug', 'No slug')}
Status: {post.get('status', 'Unknown')}
Visibility: {post.get('visibility', 'Unknown')}
Featured: {post.get('featured', False)}
URL: {post.get('url', 'No URL')}
Content Formats:
HTML Content:
{html_content}
Lexical Content:
{lexical_content}
Images:
Feature Image: {post.get('feature_image', 'None')}
Feature Image Alt: {post.get('feature_image_alt', 'None')}
Feature Image Caption: {post.get('feature_image_caption', 'None')}
Meta Information:
Meta Title: {post.get('meta_title', 'None')}
Meta Description: {post.get('meta_description', 'None')}
Canonical URL: {post.get('canonical_url', 'None')}
Custom Excerpt: {post.get('custom_excerpt', 'None')}
Open Graph:
OG Image: {post.get('og_image', 'None')}
OG Title: {post.get('og_title', 'None')}
OG Description: {post.get('og_description', 'None')}
Twitter Card:
Twitter Image: {post.get('twitter_image', 'None')}
Twitter Title: {post.get('twitter_title', 'None')}
Twitter Description: {post.get('twitter_description', 'None')}
Code Injection:
Header Code: {post.get('codeinjection_head', 'None')}
Footer Code: {post.get('codeinjection_foot', 'None')}
Template:
Custom Template: {post.get('custom_template', 'None')}
Relationships:
Tags: {', '.join(tags) if tags else 'None'}
Authors: {', '.join(authors) if authors else 'None'}
Email Settings:
Email Only: {post.get('email_only', False)}
Email Subject: {post.get('email', {}).get('subject', 'None')}
Timestamps:
Created: {post.get('created_at', 'Unknown')}
Updated: {post.get('updated_at', 'Unknown')}
Published: {post.get('published_at', 'Not published')}
System IDs:
ID: {post.get('id', 'Unknown')}
UUID: {post.get('uuid', 'Unknown')}
""" """
except GhostError as e: except GhostError as e:
if ctx: if ctx:
ctx.error(f"Failed to read post: {str(e)}") ctx.error(f"Failed to read post: {str(e)}")
return str(e) return str(e)
async def update_post(post_id: str, update_data: dict, ctx: Context = None) -> str:
"""Update a blog post using lexical content.
Args:
post_id: The ID of the post to update
update_data: Dictionary containing the lexical content and updated_at timestamp.
Expected to have at least 'lexical' and 'updated_at' keys.
The lexical content must be a properly escaped JSON string in this format:
{
"root": {
"children": [
{
"children": [
{
"detail": 0,
"format": 0,
"mode": "normal",
"style": "",
"text": "Your content here",
"type": "text",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "paragraph",
"version": 1
}
],
"direction": "ltr",
"format": "",
"indent": 0,
"type": "root",
"version": 1
}
}
Example usage:
update_data = {
"post_id": "67abcffb7f82ac000179d76f",
"update_data": {
"lexical": "{\"root\":{\"children\":[{\"children\":[{\"detail\":0,\"format\":0,\"mode\":\"normal\",\"style\":\"\",\"text\":\"Hello World\",\"type\":\"text\",\"version\":1}],\"direction\":\"ltr\",\"format\":\"\",\"indent\":0,\"type\":\"paragraph\",\"version\":1}],\"direction\":\"ltr\",\"format\":\"\",\"indent\":0,\"type\":\"root\",\"version\":1}}",
"updated_at": "2025-02-11T22:54:40.000Z"
}
}
ctx: Optional context for logging
Returns:
Formatted string containing the updated post details
Raises:
GhostError: If there is an error accessing the Ghost API or missing required fields
"""
if ctx:
ctx.info(f"Updating post with ID: {post_id}")
if 'updated_at' not in update_data:
error_msg = "updated_at field is required for post updates"
if ctx:
ctx.error(error_msg)
return error_msg
if 'lexical' not in update_data:
error_msg = "lexical field is required for post updates"
if ctx:
ctx.error(error_msg)
return error_msg
try:
if ctx:
ctx.debug("Getting auth headers")
headers = await get_auth_headers(STAFF_API_KEY)
# Prepare update payload with lexical content
post_update = {
"posts": [{
"id": post_id,
"updated_at": update_data["updated_at"],
"lexical": update_data["lexical"]
}]
}
# Optionally handle tag updates if provided
import copy
update_fields = copy.deepcopy(update_data)
if "tags" in update_fields:
tags = update_fields.pop("tags")
post_update["posts"][0]["tags"] = [{"name": tag} if isinstance(tag, str) else tag for tag in tags]
# Copy any additional fields
for key, value in update_fields.items():
if key not in ["updated_at", "lexical"]:
post_update["posts"][0][key] = value
if ctx:
ctx.debug(f"Update payload: {json.dumps(post_update, indent=2)}")
if ctx:
ctx.debug(f"Making PUT request to /posts/{post_id}/")
data = await make_ghost_request(
f"posts/{post_id}/?formats=lexical&include=tags,authors",
headers,
ctx,
http_method="PUT",
json_data=post_update
)
if ctx:
ctx.debug(f"API Response: {json.dumps(data, indent=2)}")
post = data["posts"][0]
# Format tags and authors for display
tags = [tag.get('name', 'Unknown') for tag in post.get('tags', [])]
authors = [author.get('name', 'Unknown') for author in post.get('authors', [])]
# Get content preview (lexical content)
lexical_content = post.get('lexical', '')
content_preview = lexical_content[:200] + "..." if lexical_content else 'No content available'
return f"""
Post Updated Successfully:
Title: {post.get('title', 'Untitled')}
Slug: {post.get('slug', 'No slug')}
Status: {post.get('status', 'Unknown')}
Visibility: {post.get('visibility', 'Unknown')}
Featured: {post.get('featured', False)}
URL: {post.get('url', 'No URL')}
Tags: {', '.join(tags) if tags else 'None'}
Authors: {', '.join(authors) if authors else 'None'}
Published At: {post.get('published_at', 'Not published')}
Updated At: {post.get('updated_at', 'Unknown')}
Content Preview: {content_preview}
"""
except GhostError as e:
if ctx:
ctx.error(f"Failed to update post: {str(e)}")
return str(e)
async def search_posts_by_title(query: str, exact: bool = False, ctx: Context = None) -> str: async def search_posts_by_title(query: str, exact: bool = False, ctx: Context = None) -> str:
"""Search for posts by title. """Search for posts by title.