Files
awoooi/scripts/reboot-recovery/reboot-event-detector.py
Your Name c6ec7a3b71
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
fix(recovery): detect all-host reboot recovery gaps
2026-06-30 18:21:56 +08:00

237 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""Stateful all-host reboot event detector for AWOOOI recovery automation."""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
SCHEMA_VERSION = "awoooi_reboot_event_detector_v1"
REQUIRED_HOSTS = ("99", "110", "111", "112", "120", "121", "188")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Detect host reboot events from reboot-auto-recovery-host-probe output.",
)
parser.add_argument("--host-probe-file", type=Path, required=True)
parser.add_argument("--state-file", type=Path, required=True)
parser.add_argument("--target-minutes", type=int, default=10)
parser.add_argument("--generated-at")
parser.add_argument("--output", type=Path)
parser.add_argument("--prometheus-output", type=Path)
parser.add_argument(
"--required-host",
action="append",
dest="required_hosts",
help="Required host alias. May be passed more than once.",
)
parser.add_argument(
"--no-write-state",
action="store_true",
help="Evaluate without updating the state file.",
)
return parser.parse_args()
def int_value(value: Any, default: int = -1) -> int:
try:
return int(str(value))
except (TypeError, ValueError):
return default
def parse_host_probe(text: str) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line.startswith("HOST_BOOT "):
continue
row: dict[str, Any] = {}
for token in line.split()[1:]:
if "=" not in token:
continue
key, value = token.split("=", 1)
row[key] = value
row["alias"] = str(row.get("alias", ""))
row["reachable"] = row.get("reachable") == "1"
row["uptime_seconds"] = int_value(row.get("uptime_seconds"))
rows.append(row)
return rows
def load_state(path: Path) -> dict[str, Any]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
return {"hosts": {}}
return payload if isinstance(payload, dict) else {"hosts": {}}
def host_state(rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
state: dict[str, dict[str, Any]] = {}
for row in rows:
alias = str(row.get("alias") or "")
if not alias:
continue
state[alias] = {
"boot_id": str(row.get("boot_id") or "unknown"),
"uptime_seconds": int_value(row.get("uptime_seconds")),
"reachable": bool(row.get("reachable")),
"systemd_state": str(row.get("systemd_state") or "unknown"),
"startup_unit": str(row.get("startup_unit") or "unknown"),
"startup_active": str(row.get("startup_active") or "unknown"),
}
return state
def build_payload(args: argparse.Namespace) -> dict[str, Any]:
generated_at = args.generated_at or datetime.now().astimezone().isoformat(timespec="seconds")
observed_at = datetime.fromisoformat(generated_at)
target_seconds = args.target_minutes * 60
required_hosts = tuple(args.required_hosts or REQUIRED_HOSTS)
rows = parse_host_probe(args.host_probe_file.read_text(encoding="utf-8"))
current_hosts = host_state(rows)
previous = load_state(args.state_file)
previous_hosts = previous.get("hosts") if isinstance(previous.get("hosts"), dict) else {}
rebooted_hosts: list[str] = []
fresh_boot_hosts: list[str] = []
changed_boot_id_hosts: list[str] = []
unreachable_hosts: list[str] = []
missing_hosts = sorted(set(required_hosts) - set(current_hosts))
events: list[dict[str, Any]] = []
for alias in required_hosts:
current = current_hosts.get(alias)
previous_host = previous_hosts.get(alias) if isinstance(previous_hosts, dict) else None
if not current:
continue
if not current["reachable"]:
unreachable_hosts.append(alias)
previous_boot_id = (
str(previous_host.get("boot_id"))
if isinstance(previous_host, dict) and previous_host.get("boot_id")
else ""
)
current_boot_id = str(current.get("boot_id") or "")
boot_id_changed = bool(
previous_boot_id
and previous_boot_id != "unknown"
and current_boot_id
and current_boot_id != "unknown"
and previous_boot_id != current_boot_id
)
fresh_boot = bool(current.get("reachable") and int_value(current.get("uptime_seconds")) <= target_seconds)
if boot_id_changed:
changed_boot_id_hosts.append(alias)
if fresh_boot:
fresh_boot_hosts.append(alias)
if boot_id_changed or fresh_boot:
rebooted_hosts.append(alias)
events.append(
{
"host": alias,
"event": "boot_id_changed" if boot_id_changed else "fresh_boot_window",
"previous_boot_id": previous_boot_id or "unknown",
"current_boot_id": current_boot_id or "unknown",
"uptime_seconds": current.get("uptime_seconds"),
"deadline_at": (
observed_at
+ timedelta(seconds=max(0, target_seconds - int_value(current.get("uptime_seconds"), 0)))
).isoformat(timespec="seconds"),
}
)
max_uptime = max(
[int_value(row.get("uptime_seconds"), 0) for row in rows if row.get("reachable")] or [0]
)
remaining_seconds = max(0, target_seconds - max_uptime)
reboot_detected = bool(rebooted_hosts)
all_required_observed = not missing_hosts and not unreachable_hosts
all_required_in_reboot_window = set(required_hosts).issubset(set(fresh_boot_hosts))
next_state = {
"schema_version": SCHEMA_VERSION,
"updated_at": generated_at,
"target_seconds": target_seconds,
"hosts": current_hosts,
}
if not args.no_write_state:
args.state_file.parent.mkdir(parents=True, exist_ok=True)
tmp = args.state_file.with_suffix(args.state_file.suffix + ".tmp")
tmp.write_text(json.dumps(next_state, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
tmp.replace(args.state_file)
return {
"schema_version": SCHEMA_VERSION,
"generated_at": generated_at,
"target_minutes": args.target_minutes,
"target_seconds": target_seconds,
"required_hosts": list(required_hosts),
"observed_hosts": sorted(current_hosts),
"missing_hosts": missing_hosts,
"unreachable_hosts": sorted(unreachable_hosts),
"reboot_detected": reboot_detected,
"rebooted_hosts": sorted(set(rebooted_hosts)),
"fresh_boot_hosts": sorted(set(fresh_boot_hosts)),
"changed_boot_id_hosts": sorted(set(changed_boot_id_hosts)),
"all_required_hosts_observed": all_required_observed,
"all_required_hosts_in_reboot_window": all_required_in_reboot_window,
"max_observed_uptime_seconds": max_uptime,
"target_seconds_remaining": remaining_seconds,
"recovery_deadline_status": "within_target_window" if remaining_seconds > 0 else "target_window_elapsed",
"events": events,
"state_file": str(args.state_file),
"state_written": not args.no_write_state,
}
def write_prometheus(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = [
"# HELP awoooi_reboot_event_detected Whether a reboot event was detected by boot_id or fresh uptime.",
"# TYPE awoooi_reboot_event_detected gauge",
f'awoooi_reboot_event_detected{{scope="99_110_111_112_120_121_188"}} {1 if payload["reboot_detected"] else 0}',
"# HELP awoooi_reboot_event_required_host_observed Whether each required host was observed.",
"# TYPE awoooi_reboot_event_required_host_observed gauge",
]
observed = set(payload.get("observed_hosts") or [])
rebooted = set(payload.get("rebooted_hosts") or [])
for host in payload.get("required_hosts") or []:
lines.append(f'awoooi_reboot_event_required_host_observed{{host="{host}"}} {1 if host in observed else 0}')
lines.append(f'awoooi_reboot_event_host_rebooted{{host="{host}"}} {1 if host in rebooted else 0}')
lines.extend(
[
"# HELP awoooi_reboot_event_target_seconds_remaining Seconds remaining in the reboot recovery target window.",
"# TYPE awoooi_reboot_event_target_seconds_remaining gauge",
f'awoooi_reboot_event_target_seconds_remaining{{scope="99_110_111_112_120_121_188"}} {payload["target_seconds_remaining"]}',
"# HELP awoooi_reboot_event_max_observed_uptime_seconds Maximum uptime observed across reachable hosts.",
"# TYPE awoooi_reboot_event_max_observed_uptime_seconds gauge",
f'awoooi_reboot_event_max_observed_uptime_seconds{{scope="99_110_111_112_120_121_188"}} {payload["max_observed_uptime_seconds"]}',
]
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
args = parse_args()
payload = build_payload(args)
text = json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + "\n"
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(text, encoding="utf-8")
else:
print(text, end="")
if args.prometheus_output:
write_prometheus(args.prometheus_output, payload)
return 0
if __name__ == "__main__":
raise SystemExit(main())