Cron jobs can't reach the desktop gpg-agent, so pass-based key lookup failed unattended. The .env file in the skill directory works for both cron and interactive runs. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
177 lines
5.3 KiB
Python
177 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Publish a weekly review article to Ghost via the Admin API.
|
|
|
|
Usage:
|
|
publish_to_ghost.py <article.md>
|
|
|
|
Reads the Ghost Admin API key from `pass show fulfillment/api-key`.
|
|
Title is derived from the filename stem. The first H1 of the markdown
|
|
is stripped before conversion (Ghost renders the title field itself).
|
|
The newsletter slug is hardcoded below — change NEWSLETTER_SLUG if the
|
|
publication is ever split into multiple newsletters.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import NoReturn
|
|
|
|
ADMIN_BASE = "https://blog.the-fulfillment.org/ghost/api/admin"
|
|
API_VERSION = "v5.0"
|
|
NEWSLETTER_SLUG = "default-newsletter"
|
|
|
|
|
|
def die(msg: str, code: int = 1) -> NoReturn:
|
|
print(f"error: {msg}", file=sys.stderr)
|
|
sys.exit(code)
|
|
|
|
|
|
def b64url(b: bytes) -> str:
|
|
return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
|
|
|
|
|
|
def mint_jwt(key_id: str, secret_hex: str) -> str:
|
|
"""Mint an HS256 JWT for the Ghost Admin API (5-minute expiry)."""
|
|
header = b64url(
|
|
json.dumps(
|
|
{"alg": "HS256", "typ": "JWT", "kid": key_id}, separators=(",", ":")
|
|
).encode()
|
|
)
|
|
now = int(time.time())
|
|
payload = b64url(
|
|
json.dumps(
|
|
{"iat": now, "exp": now + 300, "aud": "/admin/"}, separators=(",", ":")
|
|
).encode()
|
|
)
|
|
signing_input = f"{header}.{payload}".encode()
|
|
sig = b64url(
|
|
hmac.new(bytes.fromhex(secret_hex), signing_input, hashlib.sha256).digest()
|
|
)
|
|
return f"{header}.{payload}.{sig}"
|
|
|
|
|
|
def get_api_key() -> tuple[str, str]:
|
|
"""Read GHOST_ADMIN_API_KEY=id:secret from .env in the skill directory."""
|
|
env_path = Path(__file__).parent / ".env"
|
|
if not env_path.is_file():
|
|
die(f"missing {env_path}")
|
|
value = None
|
|
for line in env_path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if line.startswith("GHOST_ADMIN_API_KEY="):
|
|
value = line.split("=", 1)[1].strip().strip('"').strip("'")
|
|
break
|
|
if value is None:
|
|
die(f"GHOST_ADMIN_API_KEY not found in {env_path}")
|
|
if ":" not in value:
|
|
die("GHOST_ADMIN_API_KEY is not in the expected id:secret form")
|
|
key_id, secret_hex = value.split(":", 1)
|
|
return key_id, secret_hex
|
|
|
|
|
|
def markdown_to_html(md: str) -> str:
|
|
"""Convert markdown body to HTML via pandoc."""
|
|
try:
|
|
result = subprocess.run(
|
|
["pandoc", "-f", "markdown", "-t", "html"],
|
|
input=md,
|
|
text=True,
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
except FileNotFoundError:
|
|
die("`pandoc` not found on PATH")
|
|
except subprocess.CalledProcessError as e:
|
|
die(f"pandoc failed: {e.stderr}")
|
|
return result.stdout
|
|
|
|
|
|
def strip_leading_h1(md: str) -> str:
|
|
"""Drop the first non-blank line if it's an H1, then any leading blanks."""
|
|
lines = md.splitlines()
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "":
|
|
continue
|
|
if line.startswith("# "):
|
|
lines = lines[i + 1 :]
|
|
break
|
|
# drop leading blanks
|
|
while lines and lines[0].strip() == "":
|
|
lines.pop(0)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def ghost_request(method: str, url: str, jwt: str, body: dict | None) -> dict:
|
|
headers = {
|
|
"Authorization": f"Ghost {jwt}",
|
|
"Accept-Version": API_VERSION,
|
|
"Content-Type": "application/json",
|
|
}
|
|
data = json.dumps(body).encode() if body is not None else None
|
|
req = urllib.request.Request(url, data=data, method=method, headers=headers)
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
detail = e.read().decode(errors="replace")
|
|
die(f"{method} {url} → HTTP {e.code}\n{detail}")
|
|
|
|
|
|
def main() -> None:
|
|
if len(sys.argv) != 2:
|
|
die("usage: publish_to_ghost.py <article.md>", code=2)
|
|
|
|
article_path = Path(sys.argv[1])
|
|
|
|
if not article_path.is_file():
|
|
die(f"not a file: {article_path}")
|
|
|
|
title = article_path.stem
|
|
raw = article_path.read_text(encoding="utf-8")
|
|
body_md = strip_leading_h1(raw)
|
|
html = markdown_to_html(body_md)
|
|
|
|
key_id, secret_hex = get_api_key()
|
|
jwt = mint_jwt(key_id, secret_hex)
|
|
|
|
# Step 1: create draft
|
|
draft = ghost_request(
|
|
"POST",
|
|
f"{ADMIN_BASE}/posts/?source=html",
|
|
jwt,
|
|
{"posts": [{"title": title, "html": html, "status": "draft"}]},
|
|
)
|
|
post = draft["posts"][0]
|
|
post_id = post["id"]
|
|
updated_at = post["updated_at"]
|
|
print(f"draft created: id={post_id} updated_at={updated_at}")
|
|
|
|
# Step 2: publish + send newsletter
|
|
# Mint a fresh JWT in case step 1 took a while (5-min budget is plenty,
|
|
# but free).
|
|
jwt = mint_jwt(key_id, secret_hex)
|
|
published = ghost_request(
|
|
"PUT",
|
|
f"{ADMIN_BASE}/posts/{post_id}/"
|
|
f"?newsletter={NEWSLETTER_SLUG}&email_segment=all",
|
|
jwt,
|
|
{"posts": [{"updated_at": updated_at, "status": "published"}]},
|
|
)
|
|
p = published["posts"][0]
|
|
email_status = (p.get("email") or {}).get("status", "none")
|
|
print(f"published: {p.get('url')} (email: {email_status})")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|