#!/usr/bin/env python3 """Classify sustained host load and emit a controlled automation packet. The controller is intentionally read-only by default. It turns HostLoadAverageSustainedHigh from a generic "SSH and look around" alert into a deterministic AI Agent control packet: * orphan browser/smoke load -> gated SIGTERM helper dry-run, then controlled apply with evidence and post-apply verifier * active Gitea Actions/BuildKit load -> runner pressure stays fail-closed; drain/cancel decisions must use runner/CD verifiers, not process kills * unknown or critical pressure -> source-specific playbook or break-glass It never reads secrets, raw runner registrations, sessions, or environment files, and it never mutates host state. """ from __future__ import annotations import argparse import json import re import subprocess import time from pathlib import Path from typing import Any DEFAULT_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom") DEFAULT_DOCKER_STATS_FILE = Path("/home/wooo/node_exporter_textfiles/docker_stats.prom") DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS = 300 DEFAULT_SCRIPT_DIR = Path("/home/wooo/scripts") SCHEMA_VERSION = "host_sustained_load_controlled_automation_v1" LABEL_RE = re.compile(r"(?P[A-Za-z_][A-Za-z0-9_]*)=\"(?P(?:[^\"\\\\]|\\\\.)*)\"") METRIC_RE = re.compile( r"^(?P[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P[^}]*)\})?\s+" r"(?P[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Build a controlled AI Agent packet for sustained host load." ) parser.add_argument("--host", default="110") parser.add_argument("--metrics-file", type=Path, default=DEFAULT_METRICS_FILE) parser.add_argument("--docker-stats-file", type=Path, default=DEFAULT_DOCKER_STATS_FILE) parser.add_argument( "--docker-stats-max-age-seconds", type=int, default=DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS, ) parser.add_argument("--load5-per-core-threshold", type=float, default=1.5) parser.add_argument("--container-cpu-threshold", type=float, default=2.0) parser.add_argument("--hot-container-cpu-threshold", type=float, default=1.0) parser.add_argument("--process-family-cpu-threshold", type=float, default=50.0) parser.add_argument("--ci-stale-age-seconds", type=int, default=1800) parser.add_argument("--script-dir", type=Path, default=DEFAULT_SCRIPT_DIR) parser.add_argument("--ps-file", type=Path) parser.add_argument("--top-n", type=int, default=8) parser.add_argument("--json", action="store_true", help="Print JSON only.") return parser.parse_args() def _unescape_label(value: str) -> str: return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n") def parse_prometheus_text(text: str) -> list[dict[str, Any]]: samples: list[dict[str, Any]] = [] for raw_line in text.splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue match = METRIC_RE.match(line) if not match: continue labels = { item.group("key"): _unescape_label(item.group("value")) for item in LABEL_RE.finditer(match.group("labels") or "") } samples.append( { "name": match.group("name"), "labels": labels, "value": float(match.group("value")), } ) return samples def _sample_value( samples: list[dict[str, Any]], name: str, *, host: str, labels: dict[str, str] | None = None, default: float = 0.0, ) -> float: expected = {"host": host, **(labels or {})} for sample in samples: if sample["name"] != name: continue sample_labels = sample["labels"] if all(sample_labels.get(key) == value for key, value in expected.items()): return float(sample["value"]) return default def _sample_value_any(samples: list[dict[str, Any]], name: str) -> float | None: for sample in samples: if sample["name"] == name: return float(sample["value"]) return None def _textfile_mtime_seconds(samples: list[dict[str, Any]], suffix: str) -> float | None: for sample in samples: if sample["name"] != "node_textfile_mtime_seconds": continue file_label = str(sample["labels"].get("file") or "") if file_label.endswith(suffix): return float(sample["value"]) return None def docker_stats_freshness( *, samples: list[dict[str, Any]], docker_stats_file: Path, max_age_seconds: int, ) -> dict[str, Any]: mtime = _textfile_mtime_seconds(samples, "docker_stats.prom") now = _sample_value_any(samples, "node_time_seconds") source = "node_textfile_mtime_seconds" if mtime is None: try: mtime = docker_stats_file.stat().st_mtime now = time.time() source = "file_stat_mtime" except FileNotFoundError: return { "fresh": False, "age_seconds": None, "max_age_seconds": max_age_seconds, "source": "missing", } if now is None: now = time.time() age_seconds = max(0, int(now - mtime)) return { "fresh": age_seconds <= max_age_seconds, "age_seconds": age_seconds, "max_age_seconds": max_age_seconds, "source": source, } def _rule_values(samples: list[dict[str, Any]], name: str, *, host: str) -> list[dict[str, Any]]: values = [] for sample in samples: if sample["name"] != name: continue labels = sample["labels"] if labels.get("host") != host: continue rule = labels.get("rule") if not rule: continue values.append({"rule": rule, "value": float(sample["value"])}) return values def _top_orphan_rule(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None: counts = _rule_values( samples, "awoooi_host_runaway_browser_orphan_group_count", host=host, ) cpu_by_rule = { item["rule"]: item["value"] for item in _rule_values( samples, "awoooi_host_runaway_browser_orphan_cpu_percent", host=host, ) } candidates = [ { "rule": item["rule"], "group_count": int(item["value"]), "cpu_percent": round(cpu_by_rule.get(item["rule"], 0.0), 3), } for item in counts if item["value"] > 0 ] if not candidates: return None return sorted(candidates, key=lambda item: (-item["cpu_percent"], item["rule"]))[0] def _top_container_cpu(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None: candidates = [] for sample in samples: if sample["name"] != "docker_container_cpu_cores": continue labels = sample["labels"] if labels.get("host", host) != host: continue candidates.append( { "container_name": labels.get("container_name") or labels.get("name") or "unknown", "cpu_cores": round(float(sample["value"]), 6), } ) if not candidates: return None return sorted(candidates, key=lambda item: (-item["cpu_cores"], item["container_name"]))[0] def _read_text(path: Path | None) -> str: if path is None: return "" try: return path.read_text(encoding="utf-8") except FileNotFoundError: return "" def _collect_ps_text(ps_file: Path | None) -> str: if ps_file is not None: return _read_text(ps_file) try: result = subprocess.run( ["ps", "-eo", "pid=,ppid=,pgid=,etimes=,pcpu=,pmem=,comm=,args="], check=True, capture_output=True, text=True, timeout=10, ) except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): return "" return result.stdout def _classify_process_family(comm: str, args: str) -> str: text = f"{comm} {args}".lower() if ( "act_runner" in text or "gitea-actions-task" in text or "/.cache/act/" in text or "/opt/hostedtoolcache/" in text or "pnpm install" in text or "npm ci" in text or "yarn install" in text ): return "gitea_actions_runner" if "docker build" in text or "buildx" in text or "buildkit" in text: return "docker_build" if "next build" in text or "turbo build" in text or ("pnpm" in text and " build" in text): return "web_build" if "chrome" in text or "chromium" in text or "playwright" in text: return "headless_browser" if "gitea" in text: return "gitea_service" if "postgres" in text or "postmaster" in text: return "postgres" if "clickhouse" in text: return "clickhouse" if "kafka" in text: return "kafka" if "sentry" in text: return "sentry" if "systemctl" in text or "systemd" in text or "dbus" in text: return "systemd_control_plane" if "sshd" in text: return "ssh_control_plane" if "python" in text: return "python_job" if "node" in text: return "node_service" return "unknown" def _parse_ps_text(text: str) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for raw_line in text.splitlines(): line = raw_line.strip() if not line: continue parts = line.split(None, 7) if len(parts) < 7: continue pid, ppid, pgid, etimes, pcpu, pmem, comm = parts[:7] args = parts[7] if len(parts) > 7 else comm try: rows.append( { "pid": int(pid), "ppid": int(ppid), "pgid": int(pgid), "etimes": int(float(etimes)), "cpu_percent": float(pcpu), "mem_percent": float(pmem), "comm": Path(comm).name[:48], "family": _classify_process_family(comm, args), } ) except ValueError: continue return rows def _summarize_processes(rows: list[dict[str, Any]], *, top_n: int) -> dict[str, Any]: top_rows = sorted(rows, key=lambda item: (-item["cpu_percent"], item["comm"], item["pid"]))[:top_n] families: dict[str, dict[str, Any]] = {} for row in rows: family = row["family"] current = families.setdefault( family, { "family": family, "process_count": 0, "cpu_percent": 0.0, "max_age_seconds": 0, "sample_comm": "", }, ) current["process_count"] += 1 current["cpu_percent"] += row["cpu_percent"] current["max_age_seconds"] = max(current["max_age_seconds"], row["etimes"]) if not current["sample_comm"] or row["cpu_percent"] > current.get("_sample_cpu", -1): current["sample_comm"] = row["comm"] current["_sample_cpu"] = row["cpu_percent"] family_rows = [] for item in families.values(): item.pop("_sample_cpu", None) item["cpu_percent"] = round(float(item["cpu_percent"]), 3) family_rows.append(item) return { "top_processes": [ { "pid": row["pid"], "ppid": row["ppid"], "pgid": row["pgid"], "cpu_percent": round(row["cpu_percent"], 3), "mem_percent": round(row["mem_percent"], 3), "age_seconds": row["etimes"], "comm": row["comm"], "family": row["family"], } for row in top_rows ], "families": sorted(family_rows, key=lambda item: (-item["cpu_percent"], item["family"]))[ :top_n ], } def _family_cpu(process_summary: dict[str, Any], *families: str) -> float: return sum( float(item.get("cpu_percent") or 0.0) for item in process_summary.get("families", []) if item.get("family") in families ) def _top_family(process_summary: dict[str, Any]) -> dict[str, Any] | None: families = process_summary.get("families", []) return families[0] if families else None def build_packet( *, host: str, samples: list[dict[str, Any]], docker_samples: list[dict[str, Any]], docker_stats_status: dict[str, Any], process_summary: dict[str, Any], metrics_file: Path, docker_stats_file: Path, load5_per_core_threshold: float, container_cpu_threshold: float, hot_container_cpu_threshold: float, process_family_cpu_threshold: float, ci_stale_age_seconds: int, script_dir: Path = DEFAULT_SCRIPT_DIR, ) -> dict[str, Any]: monitor_up = int( _sample_value( samples, "awoooi_host_runaway_process_monitor_up", host=host, labels={"mode": "read_only"}, default=0, ) ) load5_per_core = _sample_value(samples, "awoooi_host_load5_per_core", host=host) swap_used_ratio = _sample_value(samples, "awoooi_host_swap_used_ratio", host=host) remediation_authorized = int( _sample_value( samples, "awoooi_host_runaway_process_remediation_authorized", host=host, ) ) active_ci_containers = int( _sample_value( samples, "awoooi_host_gitea_actions_active_container_count", host=host, default=0, ) ) active_ci_groups = int( _sample_value( samples, "awoooi_host_gitea_actions_active_process_group_count", host=host, default=0, ) ) active_ci_cpu = _sample_value( samples, "awoooi_host_gitea_actions_active_process_cpu_percent", host=host, ) active_ci_oldest_age = int( _sample_value( samples, "awoooi_host_gitea_actions_active_process_oldest_age_seconds", host=host, ) ) top_orphan = _top_orphan_rule(samples, host=host) raw_top_container = _top_container_cpu(docker_samples, host=host) top_container = raw_top_container if docker_stats_status.get("fresh") is True else None top_container_name = str((top_container or {}).get("container_name") or "").lower() top_container_cpu = float((top_container or {}).get("cpu_cores") or 0.0) stock_process_cpu = _family_cpu(process_summary, "postgres") gitea_process_cpu = _family_cpu(process_summary, "gitea_service") control_plane_cpu = _family_cpu(process_summary, "systemd_control_plane", "ssh_control_plane") top_family = _top_family(process_summary) classification = "observing_load_within_threshold" severity = "info" controlled_apply_allowed = False next_action = "keep_read_only_monitoring" dry_run_command = "" controlled_apply_command = "" controller_script = script_dir / "host-sustained-load-controller.py" evidence_script = script_dir / "host-sustained-load-evidence.py" gitea_playbook_script = script_dir / "gitea-queue-hook-backlog-playbook.py" remediation_script = script_dir / "host-runaway-process-remediation.py" verifier_command = ( f"{controller_script} " f"--host {host} --metrics-file {metrics_file}" f" --docker-stats-file {docker_stats_file}" ) if monitor_up != 1: classification = "blocked_monitor_unavailable" severity = "warning" next_action = "restore_host_runaway_process_exporter_textfile_before_apply" elif remediation_authorized > 0: classification = "blocked_monitor_authority_violation" severity = "critical" next_action = "rollback_monitor_to_read_only_exporter" elif load5_per_core > load5_per_core_threshold and top_orphan: classification = "controlled_orphan_browser_remediation_ready" severity = "critical" controlled_apply_allowed = True rule = top_orphan["rule"] dry_run_command = f"{remediation_script} --rule {rule}" controlled_apply_command = ( f"{remediation_script} " f"--rule {rule} --apply --confirm-apply " "--controlled-apply-id ${CONTROLLED_APPLY_ID} " "--evidence-ref ${EVIDENCE_REF} " "--post-apply-verifier " f"'{controller_script} --host " f"{host} --metrics-file {metrics_file} " f"--docker-stats-file {docker_stats_file}' " "--wait-seconds 10" ) next_action = "run_orphan_browser_remediation_dry_run_then_controlled_sigterm" elif ( load5_per_core > load5_per_core_threshold and (active_ci_containers > 0 or active_ci_groups > 0) ): classification = "controlled_ci_runner_saturation_guarded" severity = "critical" if active_ci_oldest_age >= ci_stale_age_seconds else "warning" controlled_apply_allowed = active_ci_oldest_age >= ci_stale_age_seconds dry_run_command = ( "ops/runner/read-public-gitea-actions-queue.py --json " "&& ops/runner/check-awoooi-non110-runner-readiness.sh" ) controlled_apply_command = ( "keep_110_runner_pressure_gate_fail_closed; " "only cancel/drain stale Gitea Actions through runner verifier packet" ) next_action = ( "prepare_runner_drain_or_cancel_packet_without_process_kill" if controlled_apply_allowed else "keep_pressure_gate_fail_closed_until_ci_load_clears" ) elif ( ( "stockplatform-v2-postgres-1" in top_container_name or top_container_name == "stockplatform-v2-api-1" ) and top_container_cpu >= container_cpu_threshold ) or stock_process_cpu >= process_family_cpu_threshold * 2: classification = "blocked_stockplatform_hot_query_or_api_pressure_requires_playbook" severity = ( "critical" if load5_per_core > load5_per_core_threshold or stock_process_cpu >= process_family_cpu_threshold * 2 else "warning" ) dry_run_command = ( f"{evidence_script} " f"--host {host} --metrics-file {metrics_file} " f"--docker-stats-file {docker_stats_file} --json" ) next_action = "run_stockplatform_hot_query_or_api_pressure_playbook_check_mode" elif ( top_container_name == "gitea" and top_container_cpu >= container_cpu_threshold ) or gitea_process_cpu >= process_family_cpu_threshold * 2: classification = "blocked_gitea_queue_or_hook_backlog_requires_playbook" severity = ( "critical" if load5_per_core > load5_per_core_threshold or gitea_process_cpu >= process_family_cpu_threshold * 2 else "warning" ) dry_run_command = ( f"{gitea_playbook_script} " f"--host {host} --metrics-file {metrics_file} " f"--docker-stats-file {docker_stats_file} --json" ) next_action = "run_gitea_queue_or_hook_backlog_playbook_check_mode" elif ( "stockplatform-v2-postgres-1" in top_container_name and top_container_cpu >= hot_container_cpu_threshold ) or ( top_container_name == "stockplatform-v2-api-1" and top_container_cpu >= hot_container_cpu_threshold ) or stock_process_cpu >= process_family_cpu_threshold: classification = "blocked_stockplatform_hot_query_or_api_pressure_requires_playbook" severity = "critical" if load5_per_core > load5_per_core_threshold else "warning" dry_run_command = ( f"{evidence_script} " f"--host {host} --metrics-file {metrics_file} " f"--docker-stats-file {docker_stats_file} --json" ) next_action = "run_stockplatform_hot_query_or_api_pressure_playbook_check_mode" elif ( top_container_name == "gitea" and top_container_cpu >= hot_container_cpu_threshold ) or gitea_process_cpu >= process_family_cpu_threshold: classification = "blocked_gitea_queue_or_hook_backlog_requires_playbook" severity = "critical" if load5_per_core > load5_per_core_threshold else "warning" dry_run_command = ( f"{gitea_playbook_script} " f"--host {host} --metrics-file {metrics_file} " f"--docker-stats-file {docker_stats_file} --json" ) next_action = "run_gitea_queue_or_hook_backlog_playbook_check_mode" elif control_plane_cpu >= process_family_cpu_threshold: classification = "blocked_control_plane_saturation_requires_playbook" severity = "critical" if load5_per_core > load5_per_core_threshold else "warning" dry_run_command = ( f"{evidence_script} " f"--host {host} --metrics-file {metrics_file} " f"--docker-stats-file {docker_stats_file} --json" ) next_action = "run_control_plane_saturation_playbook_check_mode" elif load5_per_core > load5_per_core_threshold and swap_used_ratio >= 0.85: classification = "blocked_memory_or_swap_pressure_requires_service_playbook" severity = "critical" next_action = "route_to_service_specific_memory_pressure_playbook" elif load5_per_core > load5_per_core_threshold: classification = "blocked_unknown_sustained_load_requires_source_specific_playbook" severity = "critical" dry_run_command = ( f"{evidence_script} " f"--host {host} --metrics-file {metrics_file} " f"--docker-stats-file {docker_stats_file} --json" ) next_action = "collect_sanitized_top_process_and_container_stats_then_select_playbook" return { "schema_version": SCHEMA_VERSION, "host": host, "mode": "read_only_control_packet", "classification": classification, "severity": severity, "controlled_apply_allowed": controlled_apply_allowed, "next_action": next_action, "readback": { "monitor_up": monitor_up, "load5_per_core": round(load5_per_core, 6), "load5_per_core_threshold": load5_per_core_threshold, "container_cpu_threshold": container_cpu_threshold, "hot_container_cpu_threshold": hot_container_cpu_threshold, "process_family_cpu_threshold": process_family_cpu_threshold, "metrics_file": str(metrics_file), "docker_stats_file": str(docker_stats_file), "script_dir": str(script_dir), "swap_used_ratio": round(swap_used_ratio, 6), "remediation_authorized": remediation_authorized, "active_ci_container_count": active_ci_containers, "active_ci_process_group_count": active_ci_groups, "active_ci_process_cpu_percent": round(active_ci_cpu, 3), "active_ci_oldest_age_seconds": active_ci_oldest_age, "top_orphan_rule": top_orphan, "top_container_cpu": top_container, "top_container_cpu_untrusted": raw_top_container, "docker_stats": docker_stats_status, "top_process_family": top_family, "stock_process_cpu_percent": round(stock_process_cpu, 3), "gitea_process_cpu_percent": round(gitea_process_cpu, 3), "control_plane_process_cpu_percent": round(control_plane_cpu, 3), }, "commands": { "dry_run": dry_run_command, "controlled_apply": controlled_apply_command, "post_apply_verifier": verifier_command, "rollback": "send SIGTERM only; no persistent host mutation. Re-run workload if needed.", }, "operation_boundaries": { "secret_value_read": False, "raw_session_read": False, "raw_runner_registration_read": False, "host_write_performed": False, "process_signal_performed": False, "docker_restart_allowed": False, "systemd_restart_allowed": False, "firewall_change_allowed": False, "critical_break_glass_required": True, }, "forbidden_actions": [ "SIGKILL", "docker_restart", "systemctl_restart", "nginx_reload", "firewall_change", "kubectl_action", "secret_read", "legacy_or_generic_runner_restore", ], } def main() -> int: args = parse_args() try: text = args.metrics_file.read_text(encoding="utf-8") samples = parse_prometheus_text(text) except FileNotFoundError: samples = [] try: docker_text = args.docker_stats_file.read_text(encoding="utf-8") docker_samples = parse_prometheus_text(docker_text) except FileNotFoundError: docker_samples = [] packet = build_packet( host=args.host, samples=samples, docker_samples=docker_samples, docker_stats_status=docker_stats_freshness( samples=samples, docker_stats_file=args.docker_stats_file, max_age_seconds=args.docker_stats_max_age_seconds, ), process_summary=_summarize_processes(_parse_ps_text(_collect_ps_text(args.ps_file)), top_n=args.top_n), metrics_file=args.metrics_file, docker_stats_file=args.docker_stats_file, load5_per_core_threshold=args.load5_per_core_threshold, container_cpu_threshold=args.container_cpu_threshold, hot_container_cpu_threshold=args.hot_container_cpu_threshold, process_family_cpu_threshold=args.process_family_cpu_threshold, ci_stale_age_seconds=args.ci_stale_age_seconds, script_dir=args.script_dir, ) if args.json: print(json.dumps(packet, ensure_ascii=False, indent=2, sort_keys=True)) else: print(f"status={packet['classification']}") print(f"controlled_apply_allowed={str(packet['controlled_apply_allowed']).lower()}") print(f"next_action={packet['next_action']}") if packet["commands"]["dry_run"]: print(f"dry_run_command={packet['commands']['dry_run']}") if packet["commands"]["controlled_apply"]: print(f"controlled_apply_command={packet['commands']['controlled_apply']}") print(f"post_apply_verifier={packet['commands']['post_apply_verifier']}") return 0 if not packet["classification"].startswith("blocked_") else 75 if __name__ == "__main__": raise SystemExit(main())