"""Homelab internal health checker. Runs on pve-control every 10 minutes via systemd timer. Publishes issue events to NATS subject `homelab_health_issue`. Silent when healthy. See home_lab_health.md for design. """ from __future__ import annotations import argparse import asyncio import json import logging import socket import subprocess import sys from dataclasses import asdict, dataclass from datetime import datetime, timezone from typing import Optional, Sequence import requests log = logging.getLogger("homelab-health") # ---------- payload ---------- @dataclass class Issue: component_name: str issue_detail: str detected_at: str root_cause: Optional[str] = None def to_dict(self) -> dict: d = asdict(self) if d["root_cause"] is None: del d["root_cause"] return d def now_iso() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() # ---------- kubectl helpers ---------- def kubectl_json(args: Sequence[str], timeout: int = 15) -> dict: cmd = ["kubectl", *args, "-o", "json"] r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) if r.returncode != 0: raise RuntimeError(f"kubectl {' '.join(args)}: {r.stderr.strip()}") return json.loads(r.stdout) @dataclass class ExecResult: ok: bool stdout: str stderr: str def kubectl_exec(namespace: str, pod_label: str, command: Sequence[str], timeout: int = 10) -> ExecResult: """Run a command inside the first pod matching pod_label.""" try: n = subprocess.run( ["kubectl", "get", "pod", "-n", namespace, "-l", pod_label, "-o", "jsonpath={.items[0].metadata.name}"], capture_output=True, text=True, timeout=5, ) except subprocess.TimeoutExpired: return ExecResult(False, "", "pod lookup timed out") if n.returncode != 0 or not n.stdout.strip(): return ExecResult(False, "", f"no pod matched {pod_label} in {namespace}") pod = n.stdout.strip() try: r = subprocess.run( ["kubectl", "exec", "-n", namespace, pod, "--", *command], capture_output=True, text=True, timeout=timeout, ) except subprocess.TimeoutExpired: return ExecResult(False, "", f"kubectl exec timed out after {timeout}s") return ExecResult(r.returncode == 0, r.stdout, r.stderr.strip()) def resolve_nodeport(namespace: str, service: str) -> Optional[int]: """Return the first NodePort on the given service, or None.""" svc = kubectl_json(["get", "svc", "-n", namespace, service]) for port in svc.get("spec", {}).get("ports", []): if "nodePort" in port: return int(port["nodePort"]) return None # ---------- probes ---------- def probe_service(svc: dict) -> list[Issue]: if svc.get("disabled"): return [] name = svc["name"] try: nodeport = resolve_nodeport(svc["namespace"], name) except Exception as e: return [Issue(name, f"kubectl failed: {e}", now_iso())] if nodeport is None: return [Issue(name, "no NodePort exposed", now_iso())] url = f"http://localhost:{nodeport}{svc['probe_path']}" try: resp = requests.get(url, timeout=10, allow_redirects=False) except requests.RequestException as e: return [Issue(name, f"probe error at {url}: {e}", now_iso())] if resp.status_code in svc["expected"]: return [] return [Issue(name, f"HTTP {resp.status_code} at {url} (expected {svc['expected']})", now_iso())] # ---------- check functions ---------- def check_nats(cfg: dict) -> list[Issue]: port = cfg["nats"]["monitoring_nodeport"] url = f"http://localhost:{port}/healthz" try: r = requests.get(url, timeout=5) except requests.RequestException as e: return [Issue("nats", f"monitoring unreachable: {e}", now_iso())] if r.status_code != 200: return [Issue("nats", f"/healthz returned {r.status_code}", now_iso())] return [] def check_databases(cfg: dict) -> list[Issue]: issues = [] for db in cfg.get("databases", []): result = kubectl_exec(db["namespace"], db["pod_label"], db["probe_cmd"]) if not result.ok: issues.append(Issue( db["name"], f"liveness probe failed: {result.stderr or '(no stderr)'}", now_iso(), )) return issues def _filter_probe(services, pred) -> list[Issue]: out = [] for s in services: if pred(s): out.extend(probe_service(s)) return out def check_ghost_blogs(cfg): return _filter_probe(cfg.get("services", []), lambda s: s["name"].startswith("ghost")) def check_mariadb_dependents(cfg): return _filter_probe(cfg.get("services", []), lambda s: s["db"] == "mariadb" and not s["name"].startswith("ghost")) def check_postgres_dependents(cfg): return _filter_probe(cfg.get("services", []), lambda s: s["db"] == "postgres") def check_standalone_services(cfg): return _filter_probe(cfg.get("services", []), lambda s: s["db"] is None) def check_all_nodeports(_cfg) -> list[Issue]: """TCP connect to every NodePort in the cluster.""" svcs = kubectl_json(["get", "svc", "-A"]) issues = [] for item in svcs.get("items", []): meta = item.get("metadata", {}) name = f"{meta.get('namespace')}/{meta.get('name')}" for port in item.get("spec", {}).get("ports", []): np = port.get("nodePort") if np is None: continue with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(3) rc = s.connect_ex(("127.0.0.1", int(np))) if rc != 0: issues.append(Issue( name, f"NodePort {np} not accepting TCP (errno {rc})", now_iso(), )) return issues # ---------- orchestration ---------- CHECKS = [ check_nats, check_databases, check_ghost_blogs, check_mariadb_dependents, check_postgres_dependents, check_standalone_services, check_all_nodeports, ] def run_all_checks(cfg: dict) -> list[Issue]: buckets: dict[str, list[Issue]] = {} for fn in CHECKS: try: buckets[fn.__name__] = fn(cfg) except Exception as e: buckets[fn.__name__] = [Issue( f"healthcheck.{fn.__name__}", f"check raised: {type(e).__name__}: {e}", now_iso(), root_cause="healthcheck bug", )] db_issues = buckets.get("check_databases", []) mariadb_down = any(i.component_name == "mariadb" for i in db_issues) postgres_down = any(i.component_name == "postgres" for i in db_issues) if mariadb_down: for i in buckets.get("check_mariadb_dependents", []) + buckets.get("check_ghost_blogs", []): if i.root_cause is None: i.root_cause = "mariadb unreachable" if postgres_down: for i in buckets.get("check_postgres_dependents", []): if i.root_cause is None: i.root_cause = "postgres unreachable" out = [] for fn in CHECKS: out.extend(buckets.get(fn.__name__, [])) return out # ---------- NATS publish ---------- async def _publish(url: str, subject: str, payloads: list[bytes]) -> None: import nats # type: ignore[import-not-found] nc = await asyncio.wait_for( nats.connect(url, connect_timeout=3, allow_reconnect=False), # type: ignore[attr-defined] timeout=8, ) try: for p in payloads: await nc.publish(subject, p) await nc.flush() finally: await nc.close() def publish_issues(issues: list[Issue], cfg: dict) -> None: if not issues: return payloads = [json.dumps(i.to_dict()).encode("utf-8") for i in issues] asyncio.run(_publish(cfg["nats"]["url"], cfg["nats"]["subject"], payloads)) # ---------- entry ---------- def load_config(path: str) -> dict: with open(path) as f: return json.load(f) def main(argv=None) -> int: logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%S%z") ap = argparse.ArgumentParser() ap.add_argument("--config", default="/home/samantha/homelab-health/checks.json") ap.add_argument("--dry-run", action="store_true", help="Print issues to stdout; do not publish to NATS") args = ap.parse_args(argv) cfg = load_config(args.config) issues = run_all_checks(cfg) for i in issues: log.warning("issue: %s", json.dumps(i.to_dict())) if not issues: log.info("all checks green") return 0 if args.dry_run: for i in issues: print(json.dumps(i.to_dict())) return 1 try: publish_issues(issues, cfg) except Exception as e: log.error("NATS publish failed: %s (issues not delivered)", e) return 1 return 1 if __name__ == "__main__": sys.exit(main())