homelab/k3s/health/checker.py
Samantha Atkins 58bfd422d4 Add homelab internal health checker
Python checker runs on pve-control via systemd timer every 10 min,
publishes issues to NATS subject homelab_health_issue. Checks NATS,
Postgres, MariaDB, Ghost blogs, DB dependents, standalone services,
and every NodePort. Silent when healthy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 15:48:07 -04:00

308 lines
9.1 KiB
Python

"""Homelab internal health checker.
Runs on pve-control every 10 minutes via systemd timer. Publishes issue
events to NATS subject `homelab_health_issue`. Silent when healthy.
See home_lab_health.md for design.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import socket
import subprocess
import sys
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Optional, Sequence
import requests
log = logging.getLogger("homelab-health")
# ---------- payload ----------
@dataclass
class Issue:
component_name: str
issue_detail: str
detected_at: str
root_cause: Optional[str] = None
def to_dict(self) -> dict:
d = asdict(self)
if d["root_cause"] is None:
del d["root_cause"]
return d
def now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
# ---------- kubectl helpers ----------
def kubectl_json(args: Sequence[str], timeout: int = 15) -> dict:
cmd = ["kubectl", *args, "-o", "json"]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
raise RuntimeError(f"kubectl {' '.join(args)}: {r.stderr.strip()}")
return json.loads(r.stdout)
@dataclass
class ExecResult:
ok: bool
stdout: str
stderr: str
def kubectl_exec(namespace: str, pod_label: str, command: Sequence[str],
timeout: int = 10) -> ExecResult:
"""Run a command inside the first pod matching pod_label."""
try:
n = subprocess.run(
["kubectl", "get", "pod", "-n", namespace, "-l", pod_label,
"-o", "jsonpath={.items[0].metadata.name}"],
capture_output=True, text=True, timeout=5,
)
except subprocess.TimeoutExpired:
return ExecResult(False, "", "pod lookup timed out")
if n.returncode != 0 or not n.stdout.strip():
return ExecResult(False, "", f"no pod matched {pod_label} in {namespace}")
pod = n.stdout.strip()
try:
r = subprocess.run(
["kubectl", "exec", "-n", namespace, pod, "--", *command],
capture_output=True, text=True, timeout=timeout,
)
except subprocess.TimeoutExpired:
return ExecResult(False, "", f"kubectl exec timed out after {timeout}s")
return ExecResult(r.returncode == 0, r.stdout, r.stderr.strip())
def resolve_nodeport(namespace: str, service: str) -> Optional[int]:
"""Return the first NodePort on the given service, or None."""
svc = kubectl_json(["get", "svc", "-n", namespace, service])
for port in svc.get("spec", {}).get("ports", []):
if "nodePort" in port:
return int(port["nodePort"])
return None
# ---------- probes ----------
def probe_service(svc: dict) -> list[Issue]:
if svc.get("disabled"):
return []
name = svc["name"]
try:
nodeport = resolve_nodeport(svc["namespace"], name)
except Exception as e:
return [Issue(name, f"kubectl failed: {e}", now_iso())]
if nodeport is None:
return [Issue(name, "no NodePort exposed", now_iso())]
url = f"http://localhost:{nodeport}{svc['probe_path']}"
try:
resp = requests.get(url, timeout=10, allow_redirects=False)
except requests.RequestException as e:
return [Issue(name, f"probe error at {url}: {e}", now_iso())]
if resp.status_code in svc["expected"]:
return []
return [Issue(name,
f"HTTP {resp.status_code} at {url} (expected {svc['expected']})",
now_iso())]
# ---------- check functions ----------
def check_nats(cfg: dict) -> list[Issue]:
port = cfg["nats"]["monitoring_nodeport"]
url = f"http://localhost:{port}/healthz"
try:
r = requests.get(url, timeout=5)
except requests.RequestException as e:
return [Issue("nats", f"monitoring unreachable: {e}", now_iso())]
if r.status_code != 200:
return [Issue("nats", f"/healthz returned {r.status_code}", now_iso())]
return []
def check_databases(cfg: dict) -> list[Issue]:
issues = []
for db in cfg.get("databases", []):
result = kubectl_exec(db["namespace"], db["pod_label"], db["probe_cmd"])
if not result.ok:
issues.append(Issue(
db["name"],
f"liveness probe failed: {result.stderr or '(no stderr)'}",
now_iso(),
))
return issues
def _filter_probe(services, pred) -> list[Issue]:
out = []
for s in services:
if pred(s):
out.extend(probe_service(s))
return out
def check_ghost_blogs(cfg):
return _filter_probe(cfg.get("services", []),
lambda s: s["name"].startswith("ghost"))
def check_mariadb_dependents(cfg):
return _filter_probe(cfg.get("services", []),
lambda s: s["db"] == "mariadb" and not s["name"].startswith("ghost"))
def check_postgres_dependents(cfg):
return _filter_probe(cfg.get("services", []),
lambda s: s["db"] == "postgres")
def check_standalone_services(cfg):
return _filter_probe(cfg.get("services", []),
lambda s: s["db"] is None)
def check_all_nodeports(_cfg) -> list[Issue]:
"""TCP connect to every NodePort in the cluster."""
svcs = kubectl_json(["get", "svc", "-A"])
issues = []
for item in svcs.get("items", []):
meta = item.get("metadata", {})
name = f"{meta.get('namespace')}/{meta.get('name')}"
for port in item.get("spec", {}).get("ports", []):
np = port.get("nodePort")
if np is None:
continue
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(3)
rc = s.connect_ex(("127.0.0.1", int(np)))
if rc != 0:
issues.append(Issue(
name,
f"NodePort {np} not accepting TCP (errno {rc})",
now_iso(),
))
return issues
# ---------- orchestration ----------
CHECKS = [
check_nats,
check_databases,
check_ghost_blogs,
check_mariadb_dependents,
check_postgres_dependents,
check_standalone_services,
check_all_nodeports,
]
def run_all_checks(cfg: dict) -> list[Issue]:
buckets: dict[str, list[Issue]] = {}
for fn in CHECKS:
try:
buckets[fn.__name__] = fn(cfg)
except Exception as e:
buckets[fn.__name__] = [Issue(
f"healthcheck.{fn.__name__}",
f"check raised: {type(e).__name__}: {e}",
now_iso(),
root_cause="healthcheck bug",
)]
db_issues = buckets.get("check_databases", [])
mariadb_down = any(i.component_name == "mariadb" for i in db_issues)
postgres_down = any(i.component_name == "postgres" for i in db_issues)
if mariadb_down:
for i in buckets.get("check_mariadb_dependents", []) + buckets.get("check_ghost_blogs", []):
if i.root_cause is None:
i.root_cause = "mariadb unreachable"
if postgres_down:
for i in buckets.get("check_postgres_dependents", []):
if i.root_cause is None:
i.root_cause = "postgres unreachable"
out = []
for fn in CHECKS:
out.extend(buckets.get(fn.__name__, []))
return out
# ---------- NATS publish ----------
async def _publish(url: str, subject: str, payloads: list[bytes]) -> None:
import nats # type: ignore[import-not-found]
nc = await asyncio.wait_for(
nats.connect(url, connect_timeout=3, allow_reconnect=False), # type: ignore[attr-defined]
timeout=8,
)
try:
for p in payloads:
await nc.publish(subject, p)
await nc.flush()
finally:
await nc.close()
def publish_issues(issues: list[Issue], cfg: dict) -> None:
if not issues:
return
payloads = [json.dumps(i.to_dict()).encode("utf-8") for i in issues]
asyncio.run(_publish(cfg["nats"]["url"], cfg["nats"]["subject"], payloads))
# ---------- entry ----------
def load_config(path: str) -> dict:
with open(path) as f:
return json.load(f)
def main(argv=None) -> int:
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z")
ap = argparse.ArgumentParser()
ap.add_argument("--config", default="/home/samantha/homelab-health/checks.json")
ap.add_argument("--dry-run", action="store_true",
help="Print issues to stdout; do not publish to NATS")
args = ap.parse_args(argv)
cfg = load_config(args.config)
issues = run_all_checks(cfg)
for i in issues:
log.warning("issue: %s", json.dumps(i.to_dict()))
if not issues:
log.info("all checks green")
return 0
if args.dry_run:
for i in issues:
print(json.dumps(i.to_dict()))
return 1
try:
publish_issues(issues, cfg)
except Exception as e:
log.error("NATS publish failed: %s (issues not delivered)", e)
return 1
return 1
if __name__ == "__main__":
sys.exit(main())