From 84edaa575b3fa49b5b5de0d9a0a163975fbcc44b Mon Sep 17 00:00:00 2001 From: Steven Van Ingelgem Date: Mon, 6 Apr 2026 14:34:38 +0200 Subject: [PATCH] feat: add models module, API helpers, and Post refactoring - Add models.py with Byline, PostMetadata, ScheduledRelease dataclasses - Add schedule_release(), get_post_metadata(), make_post_free() to Api - Extract _authenticate() and _resolve_publication() from __init__ - Add _get/_post/_put/_delete HTTP helpers, refactor all methods to use them - Add _handle_response allow_empty param with proper error chaining - Add _PRODUCTION_SUBDOMAINS safety guard - publish_draft now auto-runs prepublish validation - Use pathlib.Path instead of os.path/open for file operations - Post: dispatch table _ADD_HANDLERS replaces if/elif chain - Post: add paywall() and add_subscribe_button() methods - Post: extract _parse_markdown_blocks, _upload_or_passthrough, _render_image - Post: code_block uses match/case, _process_line handles inline headings - Refactored from_markdown() with HR support and better line merging - MCP: remove separate prepublish_draft tool (now automatic) - Add type annotations throughout all modules Co-Authored-By: Claude Opus 4.6 (1M context) --- substack/__init__.py | 3 + substack/api.py | 767 +++++++++++--------------------- substack/exceptions.py | 12 +- substack/models.py | 168 +++++++ substack/post.py | 877 ++++++++++++++++--------------------- substack_mcp/mcp_server.py | 52 +-- 6 files changed, 829 insertions(+), 1050 deletions(-) create mode 100644 substack/models.py diff --git a/substack/__init__.py b/substack/__init__.py index 4963dbe..b2ba233 100644 --- a/substack/__init__.py +++ b/substack/__init__.py @@ -9,3 +9,6 @@ __description__ = "A Python wrapper around the Substack API" from .api import Api +from .models import Byline, PostMetadata, ScheduledRelease + +__all__ = ["Api", "Byline", "PostMetadata", "ScheduledRelease"] diff --git a/substack/api.py b/substack/api.py index 676d86e..db5ff3a 100644 --- a/substack/api.py +++ b/substack/api.py @@ -1,19 +1,22 @@ -""" +"""API Wrapper.""" -API Wrapper - -""" +from __future__ import annotations import base64 import json import logging -import os -from datetime import datetime -from urllib.parse import urljoin, unquote +import re +from pathlib import Path +from typing import TYPE_CHECKING +from urllib.parse import unquote, urljoin import requests from substack.exceptions import SubstackAPIException, SubstackRequestException +from substack.models import PostMetadata + +if TYPE_CHECKING: + from datetime import datetime logger = logging.getLogger(__name__) @@ -21,62 +24,42 @@ class Api: - """ + """A python interface into the Substack API.""" - A python interface into the Substack API - - """ + _PRODUCTION_SUBDOMAINS: frozenset[str] = frozenset() def __init__( self, - email=None, - password=None, - cookies_path=None, - base_url=None, - publication_url=None, - debug=False, - cookies_string=None, - ): - """ - - To create an instance of the substack.Api class: - >>> import substack - >>> api = substack.Api(email="substack email", password="substack password") - - Args: - email: - password: - cookies_path - To re-use your session without logging in each time, you can save your cookies to a json file and - then load them in the next session. - Make sure to re-save your cookies, as they do update over time. - cookies_string - To re-use your session without logging in each time, you can provide cookies as a semicolon-separated - string (e.g., "cookie1=value1; cookie2=value2"). This is useful when copying cookies from browser - developer tools. - base_url: - The base URL to use to contact the Substack API. - Defaults to https://substack.com/api/v1. - """ + email: str | None = None, + password: str | None = None, + cookies_path: str | None = None, + base_url: str | None = None, + publication_url: str | None = None, + debug: bool = False, + cookies_string: str | None = None, + ) -> None: self.base_url = base_url or "https://substack.com/api/v1" - if debug: logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) - self._session = requests.Session() + self._authenticate(email, password, cookies_path, cookies_string) + self._resolve_publication(publication_url) - # Load cookies from file if provided - # Helps with Captcha errors by reusing cookies from "local" auth, then switching to running code in the cloud + def _authenticate( + self, + email: str | None, + password: str | None, + cookies_path: str | None, + cookies_string: str | None, + ) -> None: + """Set up session credentials from one of the supported auth methods.""" if cookies_path is not None: - with open(cookies_path) as f: - cookies = json.load(f) + cookies = json.loads(Path(cookies_path).read_text(encoding="utf-8")) self._session.cookies.update(cookies) - elif cookies_string is not None: cookies = self._parse_cookies_string(cookies_string) self._session.cookies.update(cookies) - elif email is not None and password is not None: self.login(email, password) else: @@ -84,207 +67,160 @@ def __init__( "Must provide email and password, cookies_path, or cookies_string to authenticate." ) - user_publication = None - # if the user provided a publication url, then use that - if publication_url: - import re - - # Regular expression to extract subdomain name - match = re.search(r"https://(.*).substack.com", publication_url.lower()) - subdomain = match.group(1) if match else None - - user_publications = self.get_user_publications() - # search through publications to find the publication with the matching subdomain - for publication in user_publications: - if publication["subdomain"] == subdomain: - # set the current publication to the users publication - user_publication = publication - break - else: - # get the users primary publication - user_publication = self.get_user_primary_publication() + def _resolve_publication(self, publication_url: str | None) -> None: + """Resolve and set the active publication.""" + if not publication_url: + self.change_publication(self.get_user_primary_publication()) + return + + match = re.search(r"https://(.*).substack.com", publication_url.lower()) + subdomain = match.group(1) if match else None - # set the current publication to the users primary publication - self.change_publication(user_publication) + if subdomain in self._PRODUCTION_SUBDOMAINS: + raise ValueError( + f"Subdomain '{subdomain}' is a PRODUCTION publication and is blocked." + ) + + # Try to find in user's publications + if pub := next( + (p for p in self.get_user_publications() if p["subdomain"] == subdomain), + None, + ): + self.change_publication(pub) + elif subdomain: + # Fallback: construct publication dict from URL + self.change_publication( + { + "subdomain": subdomain, + "publication_url": f"https://{subdomain}.substack.com", + } + ) + else: + self.change_publication(self.get_user_primary_publication()) @staticmethod def _parse_cookies_string(cookies_string: str) -> dict: - """ - Parse a semicolon-separated cookie string into a dictionary. - - Args: - cookies_string: A semicolon-separated string of cookies (e.g., "cookie1=value1; cookie2=value2") - - Returns: - A dictionary of cookie name-value pairs - """ cookies = {} - for cookie_pair in cookies_string.split(';'): + for cookie_pair in cookies_string.split(";"): cookie_pair = cookie_pair.strip() if not cookie_pair: continue - if '=' in cookie_pair: - key, value = cookie_pair.split('=', 1) + if "=" in cookie_pair: + key, value = cookie_pair.split("=", 1) key = key.strip() value = value.strip() - # URL decode the value (e.g., s%3A becomes s:) value = unquote(value) cookies[key] = value return cookies - def login(self, email, password) -> dict: - """ - - Login to the substack account. - - Args: - email: substack account email - password: substack account password - """ - - response = self._session.post( + def login(self, email: str, password: str) -> dict: + return self._post( f"{self.base_url}/login", - json={ - "captcha_response": None, - "email": email, - "for_pub": "", - "password": password, - "redirect": "/", - }, + captcha_response=None, + email=email, + for_pub="", + password=password, + redirect="/", ) - return Api._handle_response(response=response) - - def signin_for_pub(self, publication): - """ - Complete the signin process - """ + def signin_for_pub(self, publication: dict) -> dict: response = self._session.get( f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}", ) - try: - output = Api._handle_response(response=response) - except SubstackRequestException as ex: - output = {} - return output + return self._handle_response(response, allow_empty=True) or {} - def change_publication(self, publication): - """ - Change the publication URL - """ + def change_publication(self, publication: dict) -> None: + self.publication = publication self.publication_url = urljoin(publication["publication_url"], "api/v1") - - # sign-in to the publication self.signin_for_pub(publication) - def export_cookies(self, path: str = "cookies.json"): - """ - Export cookies to a json file. - Args: - path: path to the json file - """ + def export_cookies(self, path: str = "cookies.json") -> None: cookies = self._session.cookies.get_dict() - with open(path, "w") as f: - json.dump(cookies, f) + Path(path).write_text(json.dumps(cookies), encoding="utf-8") @staticmethod - def _handle_response(response: requests.Response): - """ - - Internal helper for handling API responses from the Substack server. - Raises the appropriate exceptions when necessary; otherwise, returns the - response. - - """ - + def _handle_response( + response: requests.Response, *, allow_empty: bool = False + ) -> dict | list | None: if not (200 <= response.status_code < 300): raise SubstackAPIException(response.status_code, response.text) + if allow_empty: + try: + return response.json() + except ValueError: + return None try: return response.json() - except ValueError: - raise SubstackRequestException("Invalid Response: %s" % response.text) + except ValueError as err: + raise SubstackRequestException( + f"Invalid Response: {response.text}" + ) from err - def get_user_id(self): - """ + # ---- HTTP helper methods ---- - Returns: + def _get(self, url: str, **params: str | int | None) -> dict | list | None: + response = self._session.get(url, params=params or None) + return Api._handle_response(response) - """ - profile = self.get_user_profile() - user_id = profile["id"] + def _post( + self, url: str, **json_data: str | int | bool | None + ) -> dict | list | None: + response = self._session.post(url, json=json_data) + return Api._handle_response(response) - return user_id + def _put( + self, url: str, **json_data: str | int | bool | None + ) -> dict | list | None: + response = self._session.put(url, json=json_data) + return Api._handle_response(response) + + def _delete(self, url: str) -> dict | list | None: + response = self._session.delete(url) + return Api._handle_response(response) + + def get_user_id(self) -> int: + profile = self.get_user_profile() + return profile["id"] @staticmethod def get_publication_url(publication: dict) -> str: - """ - Gets the publication url + if domain := ( + publication.get("custom_domain") + or publication.get("custom_domain_optional") + ): + return f"https://{domain}" + return f"https://{publication['subdomain']}.substack.com" + + def get_user_primary_publication(self) -> dict: + profile = self.get_user_profile() - Args: - publication: - """ - custom_domain = publication.get("custom_domain", None) - if not custom_domain and not publication.get('custom_domain_optional', None): - publication_url = f"https://{publication['subdomain']}.substack.com" - else: - publication_url = f"https://{custom_domain}" + if pp := profile.get("primaryPublication"): + pp["publication_url"] = self.get_publication_url(pp) + return pp - return publication_url + pub_users = profile.get("publicationUsers") or [] - def get_user_primary_publication(self): - """ - Gets the users primary publication - """ - - profile = self.get_user_profile() - primary_publication = None - - # Try old API format first (backward compatibility) - if "primaryPublication" in profile and profile["primaryPublication"] is not None: - primary_publication = profile["primaryPublication"] - else: - # New API format: look for primary publication in publicationUsers - publication_users = profile.get("publicationUsers") - if publication_users is not None and len(publication_users) > 0: - # Find the publication where is_primary is True - for pub_user in publication_users: - if pub_user.get("is_primary", False): - primary_publication = pub_user.get("publication") - if primary_publication: - break - - # If no primary found, use the first publication - if primary_publication is None: - primary_publication = publication_users[0].get("publication") - - if primary_publication is None: - raise SubstackRequestException( - "Could not find primary publication in profile" - ) - - primary_publication["publication_url"] = self.get_publication_url( - primary_publication - ) + # Find the one marked as primary + for pu in pub_users: + if pu.get("is_primary", False) and (pub := pu.get("publication")): + pub["publication_url"] = self.get_publication_url(pub) + return pub - return primary_publication + # Last resort: first publication in the list + if pub_users and (pub := pub_users[0].get("publication")): + pub["publication_url"] = self.get_publication_url(pub) + return pub - def get_user_publications(self): - """ - Gets the users publications - """ + raise SubstackRequestException("Could not find primary publication in profile") + def get_user_publications(self) -> list[dict]: profile = self.get_user_profile() - - # Loop through users "publicationUsers" list, and return a list - # of dictionaries of "name", and "subdomain", and "id" - user_publications = [] + user_publications: list[dict] = [] publication_users = profile.get("publicationUsers") - + if publication_users is None: - # If publicationUsers is None, return empty list or try to construct from other fields - # This maintains backward compatibility while handling new API format return user_publications - + for publication in publication_users: pub = publication.get("publication") if pub is not None: @@ -293,333 +229,154 @@ def get_user_publications(self): return user_publications - def get_user_profile(self): - """ - Gets the users profile - """ - response = self._session.get(f"{self.base_url}/user/profile/self") - - return Api._handle_response(response=response) - - def get_user_settings(self): - """ - Get list of users. - - Returns: - - """ - response = self._session.get(f"{self.base_url}/settings") - - return Api._handle_response(response=response) - - def get_publication_users(self): - """ - Get list of users. - - Returns: - - """ - response = self._session.get(f"{self.publication_url}/publication/users") - - return Api._handle_response(response=response) - - def get_publication_subscriber_count(self): - - """ - Get subscriber count. + def get_user_profile(self) -> dict: + return self._get(f"{self.base_url}/user/profile/self") - Returns: + def get_user_settings(self) -> dict: + return self._get(f"{self.base_url}/settings") - """ - response = self._session.get( - f"{self.publication_url}/publication_launch_checklist" - ) + def get_publication_users(self) -> list[dict]: + return self._get(f"{self.publication_url}/publication/users") - return Api._handle_response(response=response)["subscriberCount"] + def get_publication_subscriber_count(self) -> int: + return self._get(f"{self.publication_url}/publication_launch_checklist")[ + "subscriberCount" + ] def get_published_posts( - self, offset=0, limit=25, order_by="post_date", order_direction="desc" - ): - """ - Get list of published posts for the publication. - """ - response = self._session.get( + self, + offset: int = 0, + limit: int = 25, + order_by: str = "post_date", + order_direction: str = "desc", + ) -> dict: + return self._get( f"{self.publication_url}/post_management/published", - params={ - "offset": offset, - "limit": limit, - "order_by": order_by, - "order_direction": order_direction, - }, + offset=offset, + limit=limit, + order_by=order_by, + order_direction=order_direction, ) - return Api._handle_response(response=response) - def get_posts(self) -> dict: - """ - - Returns: + return self._get(f"{self.base_url}/reader/posts") - """ - response = self._session.get(f"{self.base_url}/reader/posts") - - return Api._handle_response(response=response) - - def get_drafts(self, filter=None, offset=None, limit=None): - """ - - Args: - filter: - offset: - limit: - - Returns: - - """ - response = self._session.get( + def get_drafts( + self, + filter: str | None = None, + offset: int | None = None, + limit: int | None = None, + ) -> list[dict]: + return self._get( f"{self.publication_url}/drafts", - params={"filter": filter, "offset": offset, "limit": limit}, + filter=filter, + offset=offset, + limit=limit, ) - return Api._handle_response(response=response) - def get_draft(self, draft_id): - """ - Gets a draft given it's id. + def get_draft(self, draft_id: int) -> dict: + return self._get(f"{self.publication_url}/drafts/{draft_id}") - """ - response = self._session.get(f"{self.publication_url}/drafts/{draft_id}") - return Api._handle_response(response=response) + def delete_draft(self, draft_id: int) -> dict: + return self._delete(f"{self.publication_url}/drafts/{draft_id}") - def delete_draft(self, draft_id): - """ - - Args: - draft_id: - - Returns: - - """ - response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}") - return Api._handle_response(response=response) - - def post_draft(self, body) -> dict: - """ - - Args: - body: - - Returns: - - """ + def post_draft(self, body: dict) -> dict: response = self._session.post(f"{self.publication_url}/drafts", json=body) - return Api._handle_response(response=response) - - def put_draft(self, draft, **kwargs) -> dict: - """ + return Api._handle_response(response) - Args: - draft: - **kwargs: - - Returns: - - """ + def put_draft(self, draft: int, **kwargs: str | int | bool | None) -> dict: response = self._session.put( f"{self.publication_url}/drafts/{draft}", json=kwargs, ) - return Api._handle_response(response=response) - - def prepublish_draft(self, draft) -> dict: - """ - - Args: - draft: draft id - - Returns: - - """ - - response = self._session.get( - f"{self.publication_url}/drafts/{draft}/prepublish" - ) - return Api._handle_response(response=response) + return Api._handle_response(response) def publish_draft( - self, draft, send: bool = True, share_automatically: bool = False + self, draft: int, send: bool = True, share_automatically: bool = False ) -> dict: - """ + # Run prepublish validation (matches browser flow) + pre = self._get(f"{self.publication_url}/drafts/{draft}/prepublish") + if pre.get("errors"): + logger.warning(f"Prepublish warnings for draft {draft}: {pre['errors']}") - Args: - draft: draft id - send: - share_automatically: - - Returns: - - """ - response = self._session.post( + return self._post( f"{self.publication_url}/drafts/{draft}/publish", - json={"send": send, "share_automatically": share_automatically}, + send=send, + share_automatically=share_automatically, ) - return Api._handle_response(response=response) - - def schedule_draft(self, draft, draft_datetime: datetime) -> dict: - """ - - Args: - draft: draft id - draft_datetime: datetime to schedule the draft - Returns: - - """ - response = self._session.post( + def schedule_draft(self, draft: int, draft_datetime: datetime) -> dict: + return self._post( f"{self.publication_url}/drafts/{draft}/schedule", - json={"post_date": draft_datetime.isoformat()}, + post_date=draft_datetime.isoformat(), ) - return Api._handle_response(response=response) - - def unschedule_draft(self, draft) -> dict: - """ - - Args: - draft: draft id - Returns: - - """ - response = self._session.post( - f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None} + def unschedule_draft(self, draft: int) -> dict: + return self._post( + f"{self.publication_url}/drafts/{draft}/schedule", + post_date=None, ) - return Api._handle_response(response=response) - - def get_image(self, image: str): - """ - - This method generates a new substack link that contains the image. - Args: - image: filepath or original url of image. - - Returns: - - """ - if os.path.exists(image): - with open(image, "rb") as file: - image = b"data:image/jpeg;base64," + base64.b64encode(file.read()) + def get_image(self, image: str) -> dict: + image_path = Path(image) + if image_path.exists(): + image = b"data:image/jpeg;base64," + base64.b64encode( + image_path.read_bytes() + ) response = self._session.post( f"{self.publication_url}/image", data={"image": image}, ) return Api._handle_response(response=response) - - def add_tags_to_post(self, post_id: int, tag_names: list) -> dict: - """ - Add multiple tags to a post. - - Args: - post_id: The ID of the post to tag. - tag_names: A list of tag names to add. - Returns: - A dictionary with the results of applying all tags. - """ + def add_tags_to_post(self, post_id: int, tag_names: list[str]) -> dict: results = [] for tag_name in tag_names: result = self.add_tag_to_post(post_id, tag_name) results.append(result) return {"tags_added": results} - def get_publication_post_tags(self) -> list: - """ - Retrieve all post tags for the current publication. - - Returns: - List of tag dicts as returned by Substack API. - """ - response = self._session.get(f"{self.publication_url}/publication/post-tag") - return Api._handle_response(response=response) + def get_publication_post_tags(self) -> list[dict]: + return self._get(f"{self.publication_url}/publication/post-tag") def add_tag_to_post(self, post_id: int, tag_name: str) -> dict: - """ - Add a tag to a post by first checking published tags and creating only if needed. - - Args: - post_id: The ID of the post to tag. - tag_name: The name of the tag to add. - - Returns: - The response from applying the tag to the post. - """ - # Fetch existing publication tags first (avoid re-creating an already existing tag) existing_tags = self.get_publication_post_tags() or [] - existing_tag = next( - (tag for tag in existing_tags if tag.get("name") == tag_name), - None, - ) - - if existing_tag is not None: - tag_id = existing_tag["id"] + if existing := next( + (t for t in existing_tags if t.get("name") == tag_name), None + ): + tag_id = existing["id"] else: - create_tag_response = self._session.post( - f"{self.publication_url}/publication/post-tag", - json={"name": tag_name}, + tag_data = self._post( + f"{self.publication_url}/publication/post-tag", name=tag_name ) - tag_data = Api._handle_response(create_tag_response) tag_id = tag_data["id"] - apply_tag_response = self._session.post( - f"{self.publication_url}/post/{post_id}/tag/{tag_id}", + response = self._session.post( + f"{self.publication_url}/post/{post_id}/tag/{tag_id}" ) - return Api._handle_response(apply_tag_response) - - - def get_categories(self): - """ + return Api._handle_response(response) - Retrieve list of all available categories. + def get_categories(self) -> list[dict]: + return self._get(f"{self.base_url}/categories") - Returns: - - """ - response = self._session.get(f"{self.base_url}/categories") - return Api._handle_response(response=response) - - def get_category(self, category_id, category_type, page): - """ - - Args: - category_id: - category_type: - page: - - Returns: - - """ - response = self._session.get( + def get_category(self, category_id: int, category_type: str, page: int) -> dict: + return self._get( f"{self.base_url}/category/public/{category_id}/{category_type}", - params={"page": page}, + page=page, ) - return Api._handle_response(response=response) - def get_single_category(self, category_id, category_type, page=None, limit=None): - """ - - Args: - category_id: - category_type: paid or all - page: by default substack retrieves only the first 25 publications in the category. If this is left None, - then all pages will be retrieved. The page size is 25 publications. - limit: - Returns: - - """ + def get_single_category( + self, + category_id: int, + category_type: str, + page: int | None = None, + limit: int | None = None, + ) -> dict: if page is not None: output = self.get_category(category_id, category_type, page) else: - publications = [] + publications: list[dict] = [] page = 0 while True: page_output = self.get_category(category_id, category_type, page) @@ -636,12 +393,7 @@ def get_single_category(self, category_id, category_type, page=None, limit=None) } return output - def delete_all_drafts(self): - """ - - Returns: - - """ + def delete_all_drafts(self) -> dict | None: response = None while True: drafts = self.get_drafts(filter="draft", limit=10, offset=0) @@ -651,14 +403,7 @@ def delete_all_drafts(self): response = self.delete_draft(draft.get("id")) return response - def get_sections(self): - """ - Get a list of the sections of your publication. - - TODO: this is hacky but I cannot find another place where to get the sections. - Returns: - - """ + def get_sections(self) -> list[dict]: response = self._session.get( f"{self.publication_url}/subscriptions", ) @@ -670,31 +415,53 @@ def get_sections(self): ] return sections[0] - def publication_embed(self, url): - """ - - Args: - url: - - Returns: - - """ + def publication_embed(self, url: str) -> dict: return self.call("/publication/embed", "GET", url=url) - def call(self, endpoint, method, **params): - """ - - Args: - endpoint: - method: - **params: - - Returns: - - """ + def call( + self, endpoint: str, method: str, **params: str | int | None + ) -> dict | list | None: response = self._session.request( method=method, url=f"{self.publication_url}/{endpoint}", params=params, ) return Api._handle_response(response=response) + + # ---- Higher-level helpers returning dataclasses ---- + + def schedule_release( + self, + draft_id: int, + trigger_at: datetime, + post_audience: str = "everyone", + email_audience: str = "only_free", + ) -> None: + """Schedule a post's audience change (e.g. paid -> free). + + Uses POST /drafts/{id}/scheduled_release. + """ + response = self._session.post( + f"{self.publication_url}/drafts/{draft_id}/scheduled_release", + json={ + "trigger_at": trigger_at.strftime("%Y-%m-%dT%H:%M:%S.000Z"), + "post_audience": post_audience, + "email_audience": email_audience, + }, + ) + # Substack returns empty response on success + self._handle_response(response, allow_empty=True) + + def get_post_metadata(self, draft_id: int) -> PostMetadata: + """Get full post metadata as a dataclass.""" + data = self.get_draft(draft_id) + return PostMetadata.from_api(data) + + def make_post_free(self, post_id: int) -> PostMetadata: + """Set a post's audience and comments to everyone.""" + data = self.put_draft( + post_id, + audience="everyone", + write_comment_permissions="everyone", + ) + return PostMetadata.from_api(data) diff --git a/substack/exceptions.py b/substack/exceptions.py index e9b6f29..c357759 100644 --- a/substack/exceptions.py +++ b/substack/exceptions.py @@ -2,29 +2,27 @@ class SubstackAPIException(Exception): - def __init__(self, status_code, text): + def __init__(self, status_code: int, text: str) -> None: try: json_res = json.loads(text) except ValueError: self.message = f"Invalid JSON error message from Substack: {text}" else: self.message = ", ".join( - list( - map(lambda error: error.get("msg", ""), json_res.get("errors", [])) - ) + [error.get("msg", "") for error in json_res.get("errors", [])] ) self.message = self.message or json_res.get("error", "") self.status_code = status_code - def __str__(self): + def __str__(self) -> str: return f"APIError(code={self.status_code}): {self.message}" class SubstackRequestException(Exception): - def __init__(self, message): + def __init__(self, message: str) -> None: self.message = message - def __str__(self): + def __str__(self) -> str: return f"SubstackRequestException: {self.message}" diff --git a/substack/models.py b/substack/models.py new file mode 100644 index 0000000..9a7cc55 --- /dev/null +++ b/substack/models.py @@ -0,0 +1,168 @@ +"""Dataclasses for Substack API responses.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime + + +def _parse_dt(value: str | None) -> datetime | None: + if not value: + return None + return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +@dataclass +class Byline: + id: int + name: str + handle: str | None = None + photo_url: str | None = None + bio: str | None = None + + @classmethod + def from_api(cls, data: dict) -> Byline: + return cls( + id=data["id"], + name=data.get("name", ""), + handle=data.get("handle"), + photo_url=data.get("photo_url"), + bio=data.get("bio"), + ) + + +@dataclass +class ScheduledRelease: + trigger_at: datetime + post_audience: str + email_audience: str + + @classmethod + def from_api(cls, data: dict) -> ScheduledRelease: + return cls( + trigger_at=_parse_dt(data["trigger_at"]), + post_audience=data.get("post_audience", ""), + email_audience=data.get("email_audience", ""), + ) + + +@dataclass +class PostMetadata: + """Metadata for a Substack post (draft or published).""" + + id: int + title: str | None + subtitle: str | None + slug: str | None + type: str + uuid: str + audience: str + write_comment_permissions: str + is_published: bool + publication_id: int + + post_date: datetime | None = None + draft_created_at: datetime | None = None + draft_updated_at: datetime | None = None + email_sent_at: datetime | None = None + updated_at: datetime | None = None + + section_id: int | None = None + subscriber_set_id: int | None = None + + cover_image: str | None = None + search_engine_title: str | None = None + search_engine_description: str | None = None + + should_send_email: bool = True + should_send_free_preview: bool = False + hide_from_feed: bool = False + teaser_post_eligible: bool = True + meter_type: str = "none" + + free_unlock_required: bool = False + exempt_from_archive_paywall: bool = False + + bylines: list[Byline] = field(default_factory=list) + scheduled_releases: list[ScheduledRelease] = field(default_factory=list) + + # Raw response for any fields not mapped above + _raw: dict = field(default_factory=dict, repr=False) + + @property + def needs_comment_fix(self) -> bool: + """True if post is free but comments are still restricted.""" + return ( + self.audience == "everyone" and self.write_comment_permissions != "everyone" + ) + + @classmethod + def from_api(cls, data: dict) -> PostMetadata: + bylines = [ + Byline.from_api(b) + for b in data.get("publishedBylines") or data.get("draftBylines") or [] + ] + schedules = [ + ScheduledRelease.from_api(s) for s in data.get("postSchedules") or [] + ] + + return cls( + id=data["id"], + title=data.get("title") or data.get("draft_title"), + subtitle=data.get("subtitle") or data.get("draft_subtitle"), + slug=data.get("slug"), + type=data.get("type", "newsletter"), + uuid=data.get("uuid", ""), + audience=data.get("audience", "everyone"), + write_comment_permissions=data.get("write_comment_permissions", "everyone"), + is_published=data.get("is_published", False), + publication_id=data.get("publication_id", 0), + post_date=_parse_dt(data.get("post_date")), + draft_created_at=_parse_dt(data.get("draft_created_at")), + draft_updated_at=_parse_dt(data.get("draft_updated_at")), + email_sent_at=_parse_dt(data.get("email_sent_at")), + updated_at=_parse_dt(data.get("updated_at")), + section_id=data.get("section_id"), + subscriber_set_id=data.get("subscriber_set_id"), + cover_image=data.get("cover_image"), + search_engine_title=data.get("search_engine_title"), + search_engine_description=data.get("search_engine_description"), + should_send_email=data.get("should_send_email", True), + should_send_free_preview=data.get("should_send_free_preview", False), + hide_from_feed=data.get("hide_from_feed", False), + teaser_post_eligible=data.get("teaser_post_eligible", True), + meter_type=data.get("meter_type", "none"), + free_unlock_required=data.get("free_unlock_required", False), + exempt_from_archive_paywall=data.get("exempt_from_archive_paywall", False), + bylines=bylines, + scheduled_releases=schedules, + _raw=data, + ) + + def print_summary(self) -> None: + """Print a human-readable summary of the post metadata.""" + print(f"{'=' * 50}") + print(f"Post: {self.title}") + print(f"{'=' * 50}") + print(f" id: {self.id}") + print(f" slug: {self.slug}") + print(f" type: {self.type}") + print(f" uuid: {self.uuid}") + print(f" audience: {self.audience}") + print(f" comments: {self.write_comment_permissions}") + print(f" published: {self.is_published}") + print(f" post_date: {self.post_date}") + print(f" send_email: {self.should_send_email}") + print(f" meter_type: {self.meter_type}") + print(f" free_unlock: {self.free_unlock_required}") + if self.bylines: + names = ", ".join(b.name for b in self.bylines) + print(f" bylines: {names}") + if self.scheduled_releases: + for sr in self.scheduled_releases: + print( + f" scheduled_release: {sr.trigger_at} -> {sr.post_audience} (email: {sr.email_audience})" + ) + if self.needs_comment_fix: + print(" *** NEEDS COMMENT FIX ***") + print(f"{'=' * 50}") diff --git a/substack/post.py b/substack/post.py index 60feaaf..5199b97 100644 --- a/substack/post.py +++ b/substack/post.py @@ -1,246 +1,175 @@ -""" +"""Post Utilities.""" -Post Utilities - -""" +from __future__ import annotations import json import re -from typing import Dict, List - -__all__ = ["Post", "parse_inline"] +from typing import TYPE_CHECKING, ClassVar from substack.exceptions import SectionNotExistsException +if TYPE_CHECKING: + from substack.api import Api -def parse_inline(text: str) -> List[Dict]: - """ - Convert inline Markdown in a text string into a list of tokens - for use in the post content. - - Supported formatting: - - **Bold**: Text wrapped in double asterisks. - - *Italic*: Text wrapped in single asterisks. - - [Links]: Text wrapped in square brackets followed by URL in parentheses. - - Args: - text: Text string containing inline Markdown formatting. +__all__ = ["Post", "parse_inline"] - Returns: - List of token dictionaries with content and marks. - Example: - >>> parse_inline("This is **bold** and this is [a link](https://example.com)") - [{'content': 'This is '}, {'content': 'bold', 'marks': [{'type': 'strong'}]}, {'content': ' and this is '}, {'content': 'a link', 'marks': [{'type': 'link', 'attrs': {'href': 'https://example.com'}}]}] - """ +def parse_inline(text: str) -> list[dict]: + """Convert inline Markdown in a text string into a list of tokens.""" if not text: return [] - tokens = [] - # Process text character by character to handle nested formatting - # We'll use regex to find all markdown patterns, then process them in order - - # Find all markdown patterns: links, bold, italic - # Pattern order: links first (to avoid conflicts), then bold, then italic - link_pattern = r'\[([^\]]+)\]\(([^)]+)\)' - bold_pattern = r'\*\*([^*]+)\*\*' - italic_pattern = r'(? last_pos: tokens.append({"content": text[last_pos:start]}) - # Add the formatted content if match_type == "link": - tokens.append({ - "content": content, - "marks": [{"type": "link", "attrs": {"href": url}}] - }) + tokens.append( + { + "content": content, + "marks": [{"type": "link", "attrs": {"href": url}}], + } + ) elif match_type == "bold": - tokens.append({ - "content": content, - "marks": [{"type": "strong"}] - }) + tokens.append({"content": content, "marks": [{"type": "strong"}]}) elif match_type == "italic": - tokens.append({ - "content": content, - "marks": [{"type": "em"}] - }) + tokens.append({"content": content, "marks": [{"type": "em"}]}) last_pos = end - # Add remaining text if last_pos < len(text): tokens.append({"content": text[last_pos:]}) - # Filter out empty tokens - tokens = [t for t in tokens if t.get("content")] - - return tokens + return [t for t in tokens if t.get("content")] + + +def _tokens_to_text_nodes(tokens: list[dict]) -> list[dict]: + """Convert parse_inline tokens to ProseMirror text nodes.""" + nodes: list[dict] = [] + for t in tokens: + if not t: + continue + node: dict = {"type": "text", "text": t["content"]} + marks = t.get("marks") + if marks: + pm_marks: list[dict] = [] + for m in marks: + pm: dict = {"type": m["type"]} + if m["type"] == "link": + pm["attrs"] = {"href": m.get("attrs", {}).get("href", "")} + pm_marks.append(pm) + node["marks"] = pm_marks + nodes.append(node) + return nodes class Post: - """ + """Post utility class.""" - Post utility class - - """ + _ADD_HANDLERS: ClassVar[dict[str, str]] = { + "captionedImage": "_handle_captioned_image", + "embeddedPublication": "_handle_embedded_publication", + "youtube2": "_handle_youtube", + "subscribeWidget": "_handle_subscribe_widget", + "codeBlock": "_handle_code_block", + } def __init__( self, title: str, subtitle: str, - user_id, - audience: str = None, - write_comment_permissions: str = None, - ): - """ - - Args: - title: - subtitle: - user_id: - audience: possible values: everyone, only_paid, founding, only_free - write_comment_permissions: none, only_paid, everyone (this field is a mess) - """ + user_id: int | str, + audience: str | None = None, + write_comment_permissions: str | None = None, + ) -> None: self.draft_title = title self.draft_subtitle = subtitle - self.draft_body = {"type": "doc", "content": []} + self.draft_body: dict = {"type": "doc", "content": []} self.draft_bylines = [{"id": int(user_id), "is_guest": False}] self.audience = audience if audience is not None else "everyone" - self.draft_section_id = None + self.draft_section_id: int | None = None self.section_chosen = True - # TODO better understand the possible values and combinations with audience - if write_comment_permissions is not None: - self.write_comment_permissions = write_comment_permissions - else: - self.write_comment_permissions = self.audience - - def set_section(self, name: str, sections: list): - """ - - Args: - name: - sections: - - Returns: + self.write_comment_permissions = write_comment_permissions or self.audience - """ + def set_section(self, name: str, sections: list[dict]) -> None: section = [s for s in sections if s.get("name") == name] if len(section) != 1: raise SectionNotExistsException(name) section = section[0] self.draft_section_id = section.get("id") - def add(self, item: Dict): - """ - - Add item to draft body. - - Args: - item: - - Returns: - - """ - + def add(self, item: dict) -> Post: + item_type = item.get("type") self.draft_body["content"] = self.draft_body.get("content", []) + [ - {"type": item.get("type")} + {"type": item_type} ] - content = item.get("content") - if item.get("type") == "captionedImage": - self.captioned_image(**item) - elif item.get("type") == "embeddedPublication": - self.draft_body["content"][-1]["attrs"] = item.get("url") - elif item.get("type") == "youtube2": - self.youtube(item.get("src")) - elif item.get("type") == "subscribeWidget": - self.subscribe_with_caption(item.get("message")) - elif item.get("type") == "codeBlock": - self.code_block(item.get("content"), item.get("attrs", {})) - else: - if content is not None: - self.add_complex_text(content) - if item.get("type") == "heading": - self.attrs(item.get("level", 1)) + if handler_name := self._ADD_HANDLERS.get(item_type): + getattr(self, handler_name)(item) + elif (content := item.get("content")) is not None: + self.add_complex_text(content) - marks = item.get("marks") - if marks is not None: + if item_type == "heading": + self.attrs(item.get("level", 1)) + if marks := item.get("marks"): self.marks(marks) return self - def paragraph(self, content=None): - """ + def _handle_captioned_image(self, item: dict) -> None: + self.captioned_image(**item) - Args: - content: + def _handle_embedded_publication(self, item: dict) -> None: + self.draft_body["content"][-1]["attrs"] = item.get("url") - Returns: + def _handle_youtube(self, item: dict) -> None: + self.youtube(item.get("src")) - """ - item = {"type": "paragraph"} + def _handle_subscribe_widget(self, item: dict) -> None: + self.subscribe_with_caption(item.get("message")) + + def _handle_code_block(self, item: dict) -> None: + self.code_block(item.get("content"), item.get("attrs", {})) + + def paragraph(self, content: str | None = None) -> Post: + item: dict = {"type": "paragraph"} if content is not None: item["content"] = content return self.add(item) - def heading(self, content=None, level: int = 1): - """ - - Args: - content: - level: - - Returns: - - """ - - item = {"type": "heading"} + def heading(self, content: str | None = None, level: int = 1) -> Post: + item: dict = {"type": "heading"} if content is not None: item["content"] = content item["level"] = level return self.add(item) - def blockquote(self, content=None): - """ - Add a blockquote to the post. - - The blockquote wraps one or more paragraph nodes. - - Args: - content: Text string or list of inline token dicts. When a plain - string is provided it is wrapped in a single paragraph node. - - Returns: - Self for method chaining. - """ - paragraphs: List[Dict] = [] + def blockquote(self, content: str | list | None = None) -> Post: + paragraphs: list[dict] = [] if content is not None: if isinstance(content, str): tokens = parse_inline(content) @@ -251,35 +180,52 @@ def blockquote(self, content=None): paragraphs.append({"type": "paragraph", "content": text_nodes}) elif isinstance(content, list): for item in content: - if isinstance(item, dict) and item.get("type") == "paragraph": + if not isinstance(item, dict): + continue + if item.get("type") == "paragraph": paragraphs.append(item) - elif isinstance(item, dict): + else: text_nodes = [{"type": "text", "text": item.get("content", "")}] paragraphs.append({"type": "paragraph", "content": text_nodes}) - node: Dict = {"type": "blockquote"} + node: dict = {"type": "blockquote"} if paragraphs: node["content"] = paragraphs self.draft_body["content"] = self.draft_body.get("content", []) + [node] return self - def horizontal_rule(self): - """ + def paywall(self) -> Post: + """Insert a paywall boundary. Content above is the free preview, below is paid-only.""" + return self.add({"type": "paywall"}) - Returns: + def add_subscribe_button(self, *, with_caption: bool = False) -> Post: + """Add a subscribe button. - """ - return self.add({"type": "horizontal_rule"}) + Args: + with_caption: If True, adds the large widget with caption text. + If False (default), adds the small single-button version. - def attrs(self, level): """ + if with_caption: + return self.add({"type": "subscribeWidget"}) - Args: - level: + self.draft_body["content"] = self.draft_body.get("content", []) + [ + { + "type": "button", + "attrs": { + "url": "%%checkout_url%%", + "text": "Subscribe now", + "action": None, + "class": None, + }, + } + ] + return self - Returns: + def horizontal_rule(self) -> Post: + return self.add({"type": "horizontal_rule"}) - """ + def attrs(self, level: int) -> Post: content_attrs = self.draft_body["content"][-1].get("attrs", {}) content_attrs.update({"level": level}) self.draft_body["content"][-1]["attrs"] = content_attrs @@ -293,34 +239,14 @@ def captioned_image( height: int = 819, width: int = 1456, resizeWidth: int = 728, - bytes: str = None, - alt: str = None, - title: str = None, - type: str = None, - href: str = None, + bytes: str | None = None, + alt: str | None = None, + title: str | None = None, + type: str | None = None, + href: str | None = None, belowTheFold: bool = False, - internalRedirect: str = None, - ): - """ - - Add image to body. - - Args: - bytes: - alt: - title: - type: - href: - belowTheFold: - internalRedirect: - src: - fullscreen: - imageSize: - height: - width: - resizeWidth: - """ - + internalRedirect: str | None = None, + ) -> Post: content = self.draft_body["content"][-1].get("content", []) content += [ { @@ -345,28 +271,13 @@ def captioned_image( self.draft_body["content"][-1]["content"] = content return self - def text(self, value: str): - """ - - Add text to the last paragraph. - - Args: - value: Text to add to paragraph. - - Returns: - - """ + def text(self, value: str) -> Post: content = self.draft_body["content"][-1].get("content", []) content += [{"type": "text", "text": value}] self.draft_body["content"][-1]["content"] = content return self - def add_complex_text(self, text): - """ - - Args: - text: - """ + def add_complex_text(self, text: str | list[dict]) -> None: if isinstance(text, str): self.text(text) else: @@ -374,19 +285,13 @@ def add_complex_text(self, text): if chunk: self.text(chunk.get("content")).marks(chunk.get("marks", [])) - def marks(self, marks): - """ - - Args: - marks: - - Returns: - - """ + def marks(self, marks: list[dict]) -> Post: + if not marks: + return self content = self.draft_body["content"][-1].get("content", [])[-1] content_marks = content.get("marks", []) for mark in marks: - new_mark = {"type": mark.get("type")} + new_mark: dict = {"type": mark.get("type")} if mark.get("type") == "link": href = mark.get("href") or mark.get("attrs", {}).get("href") new_mark.update({"attrs": {"href": href}}) @@ -394,35 +299,17 @@ def marks(self, marks): content["marks"] = content_marks return self - def remove_last_paragraph(self): - """Remove last paragraph""" + def remove_last_paragraph(self) -> None: del self.draft_body.get("content")[-1] - def get_draft(self): - """ - - Returns: - - """ + def get_draft(self) -> dict: out = vars(self) out["draft_body"] = json.dumps(out["draft_body"]) return out - def subscribe_with_caption(self, message: str = None): - """ - - Add subscribe widget with caption - - Args: - message: - - Returns: - - """ - + def subscribe_with_caption(self, message: str | None = None) -> Post: if message is None: - message = """Thanks for reading this newsletter! - Subscribe for free to receive new posts and support my work.""" + message = "Thanks for reading this newsletter! Subscribe for free to receive new posts and support my work." subscribe = self.draft_body["content"][-1] subscribe["attrs"] = { @@ -433,56 +320,26 @@ def subscribe_with_caption(self, message: str = None): subscribe["content"] = [ { "type": "ctaCaption", - "content": [ - { - "type": "text", - "text": message, - } - ], + "content": [{"type": "text", "text": message}], } ] return self - def youtube(self, value: str): - """ - - Add youtube video to post. - - Args: - value: youtube url - - Returns: - - """ + def youtube(self, value: str) -> Post: content_attrs = self.draft_body["content"][-1].get("attrs", {}) content_attrs.update({"videoId": value}) self.draft_body["content"][-1]["attrs"] = content_attrs return self - def code_block(self, content, attrs=None): - """ - Add code block to post. + def code_block(self, content: str | list | None, attrs: dict | None = None) -> Post: + match content: + case str(): + code_content = [{"type": "text", "text": content}] + case list(): + code_content = content + case _: + code_content = [] - Args: - content: String containing code or list of text nodes - attrs: Optional attributes like language - - Returns: - - """ - if attrs is None: - attrs = {} - - # Handle content - can be list of text nodes or a string - if isinstance(content, str): - # Convert string to list of text nodes - code_content = [{"type": "text", "text": content}] - elif isinstance(content, list): - code_content = content - else: - code_content = [] - - # Set up the code block structure code_block = self.draft_body["content"][-1] code_block["content"] = code_content if attrs: @@ -490,256 +347,264 @@ def code_block(self, content, attrs=None): return self - def from_markdown(self, markdown_content: str, api=None): - """ - Parse Markdown content and add it to the post. - - Supported Markdown features: - - Headings: Lines starting with '#' characters (1-6 levels) - - Images: Markdown image syntax ![Alt](URL) - - Linked images: [![Alt](image_url)](link_url) - images that are also links - - Links: [text](url) - inline links in paragraphs - - Code blocks: Fenced code blocks with ```language or ``` - - Blockquotes: Lines starting with '>' (consecutive lines grouped) - - Paragraphs: Regular text blocks - - Bullet lists: Lines starting with '*' or '-' - - Inline formatting: **bold** and *italic* within paragraphs - - Args: - markdown_content: Markdown string to parse and add to the post. - api: Optional Api instance for uploading local images. If provided, - local image paths will be uploaded via api.get_image(). - - Returns: - Self for method chaining. - - Example: - >>> post = Post("Title", "Subtitle", user_id) - >>> post.from_markdown("# Heading\\n\\nThis is **bold** text with [a link](https://example.com).") - """ - lines = markdown_content.split("\n") - blocks = [] - current_block: List[str] = [] + @staticmethod + def _parse_markdown_blocks(lines: list[str]) -> list[dict]: + """Parse lines into a list of text/code block dicts.""" + blocks: list[dict] = [] + current_block: list[str] = [] in_code_block = False - code_block_language = None + code_block_language: str | None = None for line in lines: - # Check for fenced code block start/end if line.strip().startswith("```"): if in_code_block: - # End of code block if current_block: - blocks.append({ - "type": "code", - "language": code_block_language, - "content": "\n".join(current_block) - }) + blocks.append( + { + "type": "code", + "language": code_block_language, + "content": "\n".join(current_block), + } + ) current_block = [] in_code_block = False code_block_language = None else: - # Start of code block if current_block: - blocks.append({"type": "text", "content": "\n".join(current_block)}) + blocks.append( + {"type": "text", "content": "\n".join(current_block)} + ) current_block = [] - # Extract language if specified language = line.strip()[3:].strip() code_block_language = language if language else None in_code_block = True continue if in_code_block: - # Inside code block - collect lines as-is current_block.append(line) else: - # Regular content if line.strip() == "": - # Empty line - end current block if it has content if current_block: - blocks.append({"type": "text", "content": "\n".join(current_block)}) + blocks.append( + {"type": "text", "content": "\n".join(current_block)} + ) current_block = [] else: current_block.append(line) - # Add any remaining content if current_block: if in_code_block: - blocks.append({ - "type": "code", - "language": code_block_language, - "content": "\n".join(current_block) - }) + blocks.append( + { + "type": "code", + "language": code_block_language, + "content": "\n".join(current_block), + } + ) else: blocks.append({"type": "text", "content": "\n".join(current_block)}) - # Process blocks + return blocks + + @staticmethod + def _upload_or_passthrough(image_url: str, api: Api | None = None) -> str: + """Strip leading slash from URL, optionally upload via api.""" + if image_url.startswith("/"): + image_url = image_url[1:] + if api is not None: + try: + image = api.get_image(image_url) + image_url = image.get("url") + except Exception: + pass + return image_url + + def _render_image(self, text_content: str, api: Api | None = None) -> None: + """Handle linked and regular image markdown syntax.""" + linked_image_match = re.match( + r"\[!\[([^\]]*)\]\(([^)]+)\)\]\(([^)]+)\)", text_content + ) + if linked_image_match: + alt_text = linked_image_match.group(1) + image_url = self._upload_or_passthrough(linked_image_match.group(2), api) + link_url = linked_image_match.group(3) + self.add( + { + "type": "captionedImage", + "src": image_url, + "alt": alt_text, + "href": link_url, + } + ) + else: + match = re.match(r"!\[.*?\]\((.*?)\)", text_content) + if match: + image_url = self._upload_or_passthrough(match.group(1), api) + self.add({"type": "captionedImage", "src": image_url}) + + def from_markdown(self, markdown_content: str, api: Api | None = None) -> Post: + """Parse Markdown content and add it to the post.""" + lines = markdown_content.split("\n") + blocks = self._parse_markdown_blocks(lines) + + # -- Render blocks into ProseMirror nodes -- + # Track pending bullet items across blocks so consecutive bullet + # blocks (even separated by blank lines) merge into one bullet_list. + pending_bullets: list[list[dict]] = [] + + def flush_bullets() -> None: + if not pending_bullets: + return + list_items = [] + for bullet_tokens in pending_bullets: + list_items.append( + { + "type": "list_item", + "content": [ + { + "type": "paragraph", + "content": _tokens_to_text_nodes(bullet_tokens), + } + ], + } + ) + self.draft_body["content"].append( + {"type": "bullet_list", "content": list_items} + ) + pending_bullets.clear() + + def _is_hr(text: str) -> bool: + stripped = text.strip() + return stripped in ("---", "***", "___") or ( + len(stripped) >= 3 + and set(stripped) <= {"-", "*", "_", " "} + and any(c * 3 in stripped for c in "-*_") + ) + + def _extract_bullet(line: str) -> str | None: + """Return bullet text if line is a bullet, else None.""" + if line.startswith("- "): + return line[2:].strip() + if line.startswith("* "): + return line[2:].strip() + if line.startswith("*") and not line.startswith("**"): + return line[1:].strip() + return None + + pending_para_lines: list[str] = [] + pending_quote_paras: list[dict] = [] + + def flush_para() -> None: + """Merge accumulated plain-text lines into one paragraph.""" + if not pending_para_lines: + return + merged = " ".join(pending_para_lines) + tokens = parse_inline(merged) + self.add({"type": "paragraph", "content": tokens}) + pending_para_lines.clear() + + def flush_quotes() -> None: + """Emit accumulated blockquote paragraphs as one blockquote node.""" + if not pending_quote_paras: + return + node: dict = { + "type": "blockquote", + "content": list(pending_quote_paras), + } + self.draft_body["content"].append(node) + pending_quote_paras.clear() + + def _flush_all() -> None: + flush_para() + flush_quotes() + flush_bullets() + + def _process_line(line: str) -> None: + """Process a single line of text content.""" + # Heading + if line.startswith("#"): + _flush_all() + level = len(line) - len(line.lstrip("#")) + if heading_text := line.lstrip("#").strip(): + self.heading(content=heading_text, level=min(level, 6)) + return + + if (bullet_text := _extract_bullet(line)) is not None: + flush_para() + flush_quotes() + if tokens := parse_inline(bullet_text): + pending_bullets.append(tokens) + return + + # Not a bullet — flush any pending bullets first + flush_bullets() + + if line.startswith("> ") or line == ">": + flush_para() + quote_text = line[2:] if line.startswith("> ") else "" + text_nodes = _tokens_to_text_nodes(parse_inline(quote_text)) + para = ( + {"type": "paragraph", "content": text_nodes} + if text_nodes + else {"type": "paragraph"} + ) + pending_quote_paras.append(para) + else: + flush_quotes() + pending_para_lines.append(line) + for block in blocks: if block["type"] == "code": - # Add code block - code_content = block.get("content", "").strip() - if code_content: - # Substack uses "codeBlock" type - code_attrs = {} + _flush_all() + if code_content := block.get("content", "").strip(): + code_attrs: dict = {} if block.get("language"): code_attrs["language"] = block["language"] - self.add({ - "type": "codeBlock", - "content": code_content, # Pass as string, code_block method will handle it - "attrs": code_attrs - }) - else: - # Process text block - text_content = block.get("content", "").strip() - if not text_content: - continue - - # Process headings (lines starting with '#' characters) - if text_content.startswith("#"): - level = len(text_content) - len(text_content.lstrip("#")) - heading_text = text_content.lstrip("#").strip() - if heading_text: # Only add if there's actual text - self.heading(content=heading_text, level=min(level, 6)) - - # Process images using Markdown image syntax: ![Alt](URL) - # Also handle linked images: [![Alt](image_url)](link_url) - elif text_content.startswith("!") or (text_content.startswith("[") and "![" in text_content): - # Check for linked image first: [![alt](img)](link) - linked_image_match = re.match(r'\[!\[([^\]]*)\]\(([^)]+)\)\]\(([^)]+)\)', text_content) - if linked_image_match: - # Linked image - create image with href - alt_text = linked_image_match.group(1) - image_url = linked_image_match.group(2) - link_url = linked_image_match.group(3) - - # Adjust image URL if it starts with a slash - image_url = image_url[1:] if image_url.startswith("/") else image_url - - # If api is provided and image_url is a local file, upload it - if api is not None: - try: - image = api.get_image(image_url) - image_url = image.get("url") - except Exception: - # If upload fails, use original URL - pass - - self.add({ - "type": "captionedImage", - "src": image_url, - "alt": alt_text, - "href": link_url - }) - else: - # Regular image: ![Alt](URL) - match = re.match(r"!\[.*?\]\((.*?)\)", text_content) - if match: - image_url = match.group(1) - # Adjust image URL if it starts with a slash - image_url = image_url[1:] if image_url.startswith("/") else image_url - - # If api is provided and image_url is a local file, upload it - if api is not None: - try: - image = api.get_image(image_url) - image_url = image.get("url") - except Exception: - # If upload fails, use original URL - pass - - self.add({"type": "captionedImage", "src": image_url}) - - # Process paragraphs, bullet lists, or blockquotes - else: - if "\n" in text_content: - # Process each line, grouping consecutive bullets - # into a single bullet_list node and consecutive - # blockquote lines into a single blockquote node. - pending_bullets: List[List[Dict]] = [] - pending_quotes: List[str] = [] - - def flush_bullets(): - if not pending_bullets: - return - list_items = [] - for bullet_nodes in pending_bullets: - list_items.append({ - "type": "list_item", - "content": [{"type": "paragraph", "content": bullet_nodes}], - }) - self.draft_body["content"].append( - {"type": "bullet_list", "content": list_items} - ) - pending_bullets.clear() - - def flush_quotes(): - if not pending_quotes: - return - paragraphs: List[Dict] = [] - for quote_line in pending_quotes: - tokens = parse_inline(quote_line) - text_nodes = [ - {"type": "text", "text": t["content"]} - for t in tokens if t - ] - if text_nodes: - paragraphs.append({"type": "paragraph", "content": text_nodes}) - node: Dict = {"type": "blockquote"} - if paragraphs: - node["content"] = paragraphs - self.draft_body["content"].append(node) - pending_quotes.clear() - - for line in text_content.split("\n"): - line = line.strip() - if not line: - flush_bullets() - flush_quotes() - continue - - # Check for blockquote marker - if line.startswith("> ") or line == ">": - flush_bullets() - quote_text = line[2:] if line.startswith("> ") else "" - pending_quotes.append(quote_text) - continue - - # Check for bullet marker - bullet_text = None - if line.startswith("* "): - bullet_text = line[2:].strip() - elif line.startswith("- "): - bullet_text = line[2:].strip() - elif line.startswith("*") and not line.startswith("**"): - bullet_text = line[1:].strip() - - if bullet_text is not None: - flush_quotes() - tokens = parse_inline(bullet_text) - if tokens: - pending_bullets.append(tokens) - else: - flush_bullets() - flush_quotes() - tokens = parse_inline(line) - self.add({"type": "paragraph", "content": tokens}) - - flush_bullets() - flush_quotes() - else: - # Single line — could be a blockquote or paragraph - if text_content.startswith("> ") or text_content == ">": - quote_text = text_content[2:] if text_content.startswith("> ") else "" - tokens = parse_inline(quote_text) - text_nodes = [ - {"type": "text", "text": t["content"]} - for t in tokens if t - ] - para = {"type": "paragraph", "content": text_nodes} if text_nodes else {"type": "paragraph"} - self.draft_body["content"] = self.draft_body.get("content", []) + [ - {"type": "blockquote", "content": [para]} - ] - else: - tokens = parse_inline(text_content) - self.add({"type": "paragraph", "content": tokens}) + self.add( + { + "type": "codeBlock", + "content": code_content, + "attrs": code_attrs, + } + ) + continue + + if not (text_content := block.get("content", "").strip()): + continue + + # Horizontal rule + if _is_hr(text_content): + _flush_all() + self.draft_body["content"].append({"type": "horizontal_rule"}) + continue + + # Heading — only the first line; remaining lines are processed as text + if text_content.startswith("#"): + _flush_all() + first_line, _, rest = text_content.partition("\n") + level = len(first_line) - len(first_line.lstrip("#")) + if heading_text := first_line.lstrip("#").strip(): + self.heading(content=heading_text, level=min(level, 6)) + if rest.strip(): + for line in rest.split("\n"): + if line := line.strip(): + _process_line(line) + continue + + # Image + if text_content.startswith("!") or ( + text_content.startswith("[") and "![" in text_content + ): + _flush_all() + self._render_image(text_content, api) + continue + + # Text content — may contain bullets, blockquotes, paragraphs + flush_para() + flush_quotes() + for line in text_content.split("\n"): + if line := line.strip(): + _process_line(line) + _flush_all() return self diff --git a/substack_mcp/mcp_server.py b/substack_mcp/mcp_server.py index df0ea8c..12842ce 100644 --- a/substack_mcp/mcp_server.py +++ b/substack_mcp/mcp_server.py @@ -1,7 +1,7 @@ from __future__ import annotations import os -from typing import Any, Dict, List, Optional +from typing import Any, Optional try: from dotenv import load_dotenv @@ -43,7 +43,7 @@ def get_api() -> Api: ) -def _normalize_tags(tags: Optional[Any]) -> List[str]: +def _normalize_tags(tags: Optional[Any]) -> list[str]: if tags is None: return [] if isinstance(tags, str): @@ -68,15 +68,15 @@ async def post_draft_from_markdown( slug: Optional[str] = None, draft_section_id: Optional[int] = None, tags: Optional[Any] = None, - prepublish: bool = False, publish: bool = False, send: bool = True, share_automatically: bool = False, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Create or update a Substack draft from Markdown. This tool builds a Substack `Post` from markdown content and posts a draft. - It supports optional tag assignment, prepublish (setup check), and publishing. + It supports optional tag assignment and publishing. Prepublish validation + runs automatically when publishing. Args: title: Draft title. @@ -89,13 +89,12 @@ async def post_draft_from_markdown( slug: Optional URL slug for the post. draft_section_id: Optional section ID for the draft. tags: Tag or list of tags to attach to the post. - prepublish: If true, calls `prepublish_draft` after creation. - publish: If true, calls `publish_draft` after creation (and optionally prepublish). + publish: If true, calls `publish_draft` after creation (prepublish runs automatically). send: Passed to `publish_draft` for newsletter delivery. share_automatically: Passed to `publish_draft`. Returns: - dict containing drafted post (`draft`), optional `tags`, `prepublish`, `publish` results. + dict containing drafted post (`draft`), optional `tags`, `publish` results. Examples: With the YAML structure from the README, a caller can map fields like: @@ -113,7 +112,6 @@ async def post_draft_from_markdown( tags: - python - substack - prepublish: true publish: true send: false share_automatically: true @@ -131,18 +129,16 @@ async def post_draft_from_markdown( audience='everyone', write_comment_permissions='everyone', tags=['python', 'substack'], - prepublish=True, publish=False, # set true when ready ) print(result) ``` - A longer process with manual prepublish/publish calls: + A longer process with manual publish calls: ```python from substack_mcp.mcp_server import ( post_draft_from_markdown, - prepublish_draft, publish_draft, add_tags, ) @@ -156,7 +152,6 @@ async def post_draft_from_markdown( draft_id = d['draft']['id'] await add_tags(draft_id, ['post-tag', 'news']) - await prepublish_draft(draft_id) await publish_draft(draft_id, send=True, share_automatically=True) ``` @@ -177,7 +172,7 @@ async def post_draft_from_markdown( draft = client.post_draft(post.get_draft()) - update_payload: Dict[str, Any] = {} + update_payload: dict[str, Any] = {} if search_engine_title: update_payload["search_engine_title"] = search_engine_title if search_engine_description: @@ -195,10 +190,6 @@ async def post_draft_from_markdown( if tags_list: tags_result = client.add_tags_to_post(draft.get("id"), tags_list) - prepublish_result = None - if prepublish: - prepublish_result = client.prepublish_draft(draft.get("id")) - publish_result = None if publish: publish_result = client.publish_draft( @@ -208,7 +199,6 @@ async def post_draft_from_markdown( return { "draft": draft, "tags": tags_result, - "prepublish": prepublish_result, "publish": publish_result, } @@ -216,8 +206,8 @@ async def post_draft_from_markdown( @mcp.tool() async def put_draft( draft_id: int, - update_payload: Dict[str, Any], -) -> Dict[str, Any]: + update_payload: dict[str, Any], +) -> dict[str, Any]: """Update an existing draft by draft ID. Args: @@ -232,7 +222,7 @@ async def put_draft( @mcp.tool() -async def add_tags(draft_id: int, tags: Any) -> Dict[str, Any]: +async def add_tags(draft_id: int, tags: Any) -> dict[str, Any]: """Add tags to a specific draft/post. Args: @@ -249,28 +239,16 @@ async def add_tags(draft_id: int, tags: Any) -> Dict[str, Any]: return client.add_tags_to_post(draft_id, tags_list) -@mcp.tool() -async def prepublish_draft(draft_id: int) -> Dict[str, Any]: - """Invoke prepublish checks for a draft. - - Args: - draft_id: target draft identifier. - - Returns: - Prepublish response dict from Substack API. - """ - client = get_api() - return client.prepublish_draft(draft_id) - - @mcp.tool() async def publish_draft( draft_id: int, send: bool = True, share_automatically: bool = False, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Publish a draft to live post state. + Prepublish validation runs automatically before publishing. + Args: draft_id: target draft identifier. send: if False then do not send email to subscribers.