Python checker runs on pve-control via systemd timer every 10 min, publishes issues to NATS subject homelab_health_issue. Checks NATS, Postgres, MariaDB, Ghost blogs, DB dependents, standalone services, and every NodePort. Silent when healthy. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
308 lines
9.1 KiB
Python
308 lines
9.1 KiB
Python
"""Homelab internal health checker.
|
|
|
|
Runs on pve-control every 10 minutes via systemd timer. Publishes issue
|
|
events to NATS subject `homelab_health_issue`. Silent when healthy.
|
|
See home_lab_health.md for design.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, Sequence
|
|
|
|
import requests
|
|
|
|
|
|
log = logging.getLogger("homelab-health")
|
|
|
|
|
|
# ---------- payload ----------
|
|
|
|
@dataclass
|
|
class Issue:
|
|
component_name: str
|
|
issue_detail: str
|
|
detected_at: str
|
|
root_cause: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
d = asdict(self)
|
|
if d["root_cause"] is None:
|
|
del d["root_cause"]
|
|
return d
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
|
|
|
|
|
|
# ---------- kubectl helpers ----------
|
|
|
|
def kubectl_json(args: Sequence[str], timeout: int = 15) -> dict:
|
|
cmd = ["kubectl", *args, "-o", "json"]
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
if r.returncode != 0:
|
|
raise RuntimeError(f"kubectl {' '.join(args)}: {r.stderr.strip()}")
|
|
return json.loads(r.stdout)
|
|
|
|
|
|
@dataclass
|
|
class ExecResult:
|
|
ok: bool
|
|
stdout: str
|
|
stderr: str
|
|
|
|
|
|
def kubectl_exec(namespace: str, pod_label: str, command: Sequence[str],
|
|
timeout: int = 10) -> ExecResult:
|
|
"""Run a command inside the first pod matching pod_label."""
|
|
try:
|
|
n = subprocess.run(
|
|
["kubectl", "get", "pod", "-n", namespace, "-l", pod_label,
|
|
"-o", "jsonpath={.items[0].metadata.name}"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return ExecResult(False, "", "pod lookup timed out")
|
|
if n.returncode != 0 or not n.stdout.strip():
|
|
return ExecResult(False, "", f"no pod matched {pod_label} in {namespace}")
|
|
pod = n.stdout.strip()
|
|
try:
|
|
r = subprocess.run(
|
|
["kubectl", "exec", "-n", namespace, pod, "--", *command],
|
|
capture_output=True, text=True, timeout=timeout,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return ExecResult(False, "", f"kubectl exec timed out after {timeout}s")
|
|
return ExecResult(r.returncode == 0, r.stdout, r.stderr.strip())
|
|
|
|
|
|
def resolve_nodeport(namespace: str, service: str) -> Optional[int]:
|
|
"""Return the first NodePort on the given service, or None."""
|
|
svc = kubectl_json(["get", "svc", "-n", namespace, service])
|
|
for port in svc.get("spec", {}).get("ports", []):
|
|
if "nodePort" in port:
|
|
return int(port["nodePort"])
|
|
return None
|
|
|
|
|
|
# ---------- probes ----------
|
|
|
|
def probe_service(svc: dict) -> list[Issue]:
|
|
if svc.get("disabled"):
|
|
return []
|
|
name = svc["name"]
|
|
try:
|
|
nodeport = resolve_nodeport(svc["namespace"], name)
|
|
except Exception as e:
|
|
return [Issue(name, f"kubectl failed: {e}", now_iso())]
|
|
if nodeport is None:
|
|
return [Issue(name, "no NodePort exposed", now_iso())]
|
|
|
|
url = f"http://localhost:{nodeport}{svc['probe_path']}"
|
|
try:
|
|
resp = requests.get(url, timeout=10, allow_redirects=False)
|
|
except requests.RequestException as e:
|
|
return [Issue(name, f"probe error at {url}: {e}", now_iso())]
|
|
if resp.status_code in svc["expected"]:
|
|
return []
|
|
return [Issue(name,
|
|
f"HTTP {resp.status_code} at {url} (expected {svc['expected']})",
|
|
now_iso())]
|
|
|
|
|
|
# ---------- check functions ----------
|
|
|
|
def check_nats(cfg: dict) -> list[Issue]:
|
|
port = cfg["nats"]["monitoring_nodeport"]
|
|
url = f"http://localhost:{port}/healthz"
|
|
try:
|
|
r = requests.get(url, timeout=5)
|
|
except requests.RequestException as e:
|
|
return [Issue("nats", f"monitoring unreachable: {e}", now_iso())]
|
|
if r.status_code != 200:
|
|
return [Issue("nats", f"/healthz returned {r.status_code}", now_iso())]
|
|
return []
|
|
|
|
|
|
def check_databases(cfg: dict) -> list[Issue]:
|
|
issues = []
|
|
for db in cfg.get("databases", []):
|
|
result = kubectl_exec(db["namespace"], db["pod_label"], db["probe_cmd"])
|
|
if not result.ok:
|
|
issues.append(Issue(
|
|
db["name"],
|
|
f"liveness probe failed: {result.stderr or '(no stderr)'}",
|
|
now_iso(),
|
|
))
|
|
return issues
|
|
|
|
|
|
def _filter_probe(services, pred) -> list[Issue]:
|
|
out = []
|
|
for s in services:
|
|
if pred(s):
|
|
out.extend(probe_service(s))
|
|
return out
|
|
|
|
|
|
def check_ghost_blogs(cfg):
|
|
return _filter_probe(cfg.get("services", []),
|
|
lambda s: s["name"].startswith("ghost"))
|
|
|
|
|
|
def check_mariadb_dependents(cfg):
|
|
return _filter_probe(cfg.get("services", []),
|
|
lambda s: s["db"] == "mariadb" and not s["name"].startswith("ghost"))
|
|
|
|
|
|
def check_postgres_dependents(cfg):
|
|
return _filter_probe(cfg.get("services", []),
|
|
lambda s: s["db"] == "postgres")
|
|
|
|
|
|
def check_standalone_services(cfg):
|
|
return _filter_probe(cfg.get("services", []),
|
|
lambda s: s["db"] is None)
|
|
|
|
|
|
def check_all_nodeports(_cfg) -> list[Issue]:
|
|
"""TCP connect to every NodePort in the cluster."""
|
|
svcs = kubectl_json(["get", "svc", "-A"])
|
|
issues = []
|
|
for item in svcs.get("items", []):
|
|
meta = item.get("metadata", {})
|
|
name = f"{meta.get('namespace')}/{meta.get('name')}"
|
|
for port in item.get("spec", {}).get("ports", []):
|
|
np = port.get("nodePort")
|
|
if np is None:
|
|
continue
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.settimeout(3)
|
|
rc = s.connect_ex(("127.0.0.1", int(np)))
|
|
if rc != 0:
|
|
issues.append(Issue(
|
|
name,
|
|
f"NodePort {np} not accepting TCP (errno {rc})",
|
|
now_iso(),
|
|
))
|
|
return issues
|
|
|
|
|
|
# ---------- orchestration ----------
|
|
|
|
CHECKS = [
|
|
check_nats,
|
|
check_databases,
|
|
check_ghost_blogs,
|
|
check_mariadb_dependents,
|
|
check_postgres_dependents,
|
|
check_standalone_services,
|
|
check_all_nodeports,
|
|
]
|
|
|
|
|
|
def run_all_checks(cfg: dict) -> list[Issue]:
|
|
buckets: dict[str, list[Issue]] = {}
|
|
for fn in CHECKS:
|
|
try:
|
|
buckets[fn.__name__] = fn(cfg)
|
|
except Exception as e:
|
|
buckets[fn.__name__] = [Issue(
|
|
f"healthcheck.{fn.__name__}",
|
|
f"check raised: {type(e).__name__}: {e}",
|
|
now_iso(),
|
|
root_cause="healthcheck bug",
|
|
)]
|
|
|
|
db_issues = buckets.get("check_databases", [])
|
|
mariadb_down = any(i.component_name == "mariadb" for i in db_issues)
|
|
postgres_down = any(i.component_name == "postgres" for i in db_issues)
|
|
|
|
if mariadb_down:
|
|
for i in buckets.get("check_mariadb_dependents", []) + buckets.get("check_ghost_blogs", []):
|
|
if i.root_cause is None:
|
|
i.root_cause = "mariadb unreachable"
|
|
if postgres_down:
|
|
for i in buckets.get("check_postgres_dependents", []):
|
|
if i.root_cause is None:
|
|
i.root_cause = "postgres unreachable"
|
|
|
|
out = []
|
|
for fn in CHECKS:
|
|
out.extend(buckets.get(fn.__name__, []))
|
|
return out
|
|
|
|
|
|
# ---------- NATS publish ----------
|
|
|
|
async def _publish(url: str, subject: str, payloads: list[bytes]) -> None:
|
|
import nats # type: ignore[import-not-found]
|
|
nc = await asyncio.wait_for(
|
|
nats.connect(url, connect_timeout=3, allow_reconnect=False), # type: ignore[attr-defined]
|
|
timeout=8,
|
|
)
|
|
try:
|
|
for p in payloads:
|
|
await nc.publish(subject, p)
|
|
await nc.flush()
|
|
finally:
|
|
await nc.close()
|
|
|
|
|
|
def publish_issues(issues: list[Issue], cfg: dict) -> None:
|
|
if not issues:
|
|
return
|
|
payloads = [json.dumps(i.to_dict()).encode("utf-8") for i in issues]
|
|
asyncio.run(_publish(cfg["nats"]["url"], cfg["nats"]["subject"], payloads))
|
|
|
|
|
|
# ---------- entry ----------
|
|
|
|
def load_config(path: str) -> dict:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def main(argv=None) -> int:
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%S%z")
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--config", default="/home/samantha/homelab-health/checks.json")
|
|
ap.add_argument("--dry-run", action="store_true",
|
|
help="Print issues to stdout; do not publish to NATS")
|
|
args = ap.parse_args(argv)
|
|
|
|
cfg = load_config(args.config)
|
|
issues = run_all_checks(cfg)
|
|
|
|
for i in issues:
|
|
log.warning("issue: %s", json.dumps(i.to_dict()))
|
|
|
|
if not issues:
|
|
log.info("all checks green")
|
|
return 0
|
|
|
|
if args.dry_run:
|
|
for i in issues:
|
|
print(json.dumps(i.to_dict()))
|
|
return 1
|
|
|
|
try:
|
|
publish_issues(issues, cfg)
|
|
except Exception as e:
|
|
log.error("NATS publish failed: %s (issues not delivered)", e)
|
|
return 1
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|