Files
awoooi/scripts/ops/host-sustained-load-controller.py
Your Name 859e407129
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 1s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 1m3s
CD Pipeline / build-and-deploy (push) Successful in 4m58s
AWOOOI Harbor 110 Local Repair / workflow-shape (push) Successful in 1s
AWOOOI Harbor 110 Local Repair / harbor-110-local-repair (push) Successful in 10s
CD Pipeline / post-deploy-checks (push) Successful in 3m11s
fix(ops): use deployed host pressure helper paths
2026-07-02 12:14:35 +08:00

695 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""Classify sustained host load and emit a controlled automation packet.
The controller is intentionally read-only by default. It turns
HostLoadAverageSustainedHigh from a generic "SSH and look around" alert into a
deterministic AI Agent control packet:
* orphan browser/smoke load -> gated SIGTERM helper dry-run, then controlled
apply with evidence and post-apply verifier
* active Gitea Actions/BuildKit load -> runner pressure stays fail-closed;
drain/cancel decisions must use runner/CD verifiers, not process kills
* unknown or critical pressure -> source-specific playbook or break-glass
It never reads secrets, raw runner registrations, sessions, or environment
files, and it never mutates host state.
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import time
from pathlib import Path
from typing import Any
DEFAULT_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom")
DEFAULT_DOCKER_STATS_FILE = Path("/home/wooo/node_exporter_textfiles/docker_stats.prom")
DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS = 300
DEFAULT_SCRIPT_DIR = Path("/home/wooo/scripts")
SCHEMA_VERSION = "host_sustained_load_controlled_automation_v1"
LABEL_RE = re.compile(r"(?P<key>[A-Za-z_][A-Za-z0-9_]*)=\"(?P<value>(?:[^\"\\\\]|\\\\.)*)\"")
METRIC_RE = re.compile(
r"^(?P<name>[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P<labels>[^}]*)\})?\s+"
r"(?P<value>[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Build a controlled AI Agent packet for sustained host load."
)
parser.add_argument("--host", default="110")
parser.add_argument("--metrics-file", type=Path, default=DEFAULT_METRICS_FILE)
parser.add_argument("--docker-stats-file", type=Path, default=DEFAULT_DOCKER_STATS_FILE)
parser.add_argument(
"--docker-stats-max-age-seconds",
type=int,
default=DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS,
)
parser.add_argument("--load5-per-core-threshold", type=float, default=1.5)
parser.add_argument("--container-cpu-threshold", type=float, default=2.0)
parser.add_argument("--hot-container-cpu-threshold", type=float, default=1.0)
parser.add_argument("--process-family-cpu-threshold", type=float, default=50.0)
parser.add_argument("--ci-stale-age-seconds", type=int, default=1800)
parser.add_argument("--script-dir", type=Path, default=DEFAULT_SCRIPT_DIR)
parser.add_argument("--ps-file", type=Path)
parser.add_argument("--top-n", type=int, default=8)
parser.add_argument("--json", action="store_true", help="Print JSON only.")
return parser.parse_args()
def _unescape_label(value: str) -> str:
return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n")
def parse_prometheus_text(text: str) -> list[dict[str, Any]]:
samples: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
match = METRIC_RE.match(line)
if not match:
continue
labels = {
item.group("key"): _unescape_label(item.group("value"))
for item in LABEL_RE.finditer(match.group("labels") or "")
}
samples.append(
{
"name": match.group("name"),
"labels": labels,
"value": float(match.group("value")),
}
)
return samples
def _sample_value(
samples: list[dict[str, Any]],
name: str,
*,
host: str,
labels: dict[str, str] | None = None,
default: float = 0.0,
) -> float:
expected = {"host": host, **(labels or {})}
for sample in samples:
if sample["name"] != name:
continue
sample_labels = sample["labels"]
if all(sample_labels.get(key) == value for key, value in expected.items()):
return float(sample["value"])
return default
def _sample_value_any(samples: list[dict[str, Any]], name: str) -> float | None:
for sample in samples:
if sample["name"] == name:
return float(sample["value"])
return None
def _textfile_mtime_seconds(samples: list[dict[str, Any]], suffix: str) -> float | None:
for sample in samples:
if sample["name"] != "node_textfile_mtime_seconds":
continue
file_label = str(sample["labels"].get("file") or "")
if file_label.endswith(suffix):
return float(sample["value"])
return None
def docker_stats_freshness(
*,
samples: list[dict[str, Any]],
docker_stats_file: Path,
max_age_seconds: int,
) -> dict[str, Any]:
mtime = _textfile_mtime_seconds(samples, "docker_stats.prom")
now = _sample_value_any(samples, "node_time_seconds")
source = "node_textfile_mtime_seconds"
if mtime is None:
try:
mtime = docker_stats_file.stat().st_mtime
now = time.time()
source = "file_stat_mtime"
except FileNotFoundError:
return {
"fresh": False,
"age_seconds": None,
"max_age_seconds": max_age_seconds,
"source": "missing",
}
if now is None:
now = time.time()
age_seconds = max(0, int(now - mtime))
return {
"fresh": age_seconds <= max_age_seconds,
"age_seconds": age_seconds,
"max_age_seconds": max_age_seconds,
"source": source,
}
def _rule_values(samples: list[dict[str, Any]], name: str, *, host: str) -> list[dict[str, Any]]:
values = []
for sample in samples:
if sample["name"] != name:
continue
labels = sample["labels"]
if labels.get("host") != host:
continue
rule = labels.get("rule")
if not rule:
continue
values.append({"rule": rule, "value": float(sample["value"])})
return values
def _top_orphan_rule(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None:
counts = _rule_values(
samples,
"awoooi_host_runaway_browser_orphan_group_count",
host=host,
)
cpu_by_rule = {
item["rule"]: item["value"]
for item in _rule_values(
samples,
"awoooi_host_runaway_browser_orphan_cpu_percent",
host=host,
)
}
candidates = [
{
"rule": item["rule"],
"group_count": int(item["value"]),
"cpu_percent": round(cpu_by_rule.get(item["rule"], 0.0), 3),
}
for item in counts
if item["value"] > 0
]
if not candidates:
return None
return sorted(candidates, key=lambda item: (-item["cpu_percent"], item["rule"]))[0]
def _top_container_cpu(samples: list[dict[str, Any]], *, host: str) -> dict[str, Any] | None:
candidates = []
for sample in samples:
if sample["name"] != "docker_container_cpu_cores":
continue
labels = sample["labels"]
if labels.get("host", host) != host:
continue
candidates.append(
{
"container_name": labels.get("container_name") or labels.get("name") or "unknown",
"cpu_cores": round(float(sample["value"]), 6),
}
)
if not candidates:
return None
return sorted(candidates, key=lambda item: (-item["cpu_cores"], item["container_name"]))[0]
def _read_text(path: Path | None) -> str:
if path is None:
return ""
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
return ""
def _collect_ps_text(ps_file: Path | None) -> str:
if ps_file is not None:
return _read_text(ps_file)
try:
result = subprocess.run(
["ps", "-eo", "pid=,ppid=,pgid=,etimes=,pcpu=,pmem=,comm=,args="],
check=True,
capture_output=True,
text=True,
timeout=10,
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return ""
return result.stdout
def _classify_process_family(comm: str, args: str) -> str:
text = f"{comm} {args}".lower()
if (
"act_runner" in text
or "gitea-actions-task" in text
or "/.cache/act/" in text
or "/opt/hostedtoolcache/" in text
or "pnpm install" in text
or "npm ci" in text
or "yarn install" in text
):
return "gitea_actions_runner"
if "docker build" in text or "buildx" in text or "buildkit" in text:
return "docker_build"
if "next build" in text or "turbo build" in text or ("pnpm" in text and " build" in text):
return "web_build"
if "chrome" in text or "chromium" in text or "playwright" in text:
return "headless_browser"
if "gitea" in text:
return "gitea_service"
if "postgres" in text or "postmaster" in text:
return "postgres"
if "clickhouse" in text:
return "clickhouse"
if "kafka" in text:
return "kafka"
if "sentry" in text:
return "sentry"
if "systemctl" in text or "systemd" in text or "dbus" in text:
return "systemd_control_plane"
if "sshd" in text:
return "ssh_control_plane"
if "python" in text:
return "python_job"
if "node" in text:
return "node_service"
return "unknown"
def _parse_ps_text(text: str) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
parts = line.split(None, 7)
if len(parts) < 7:
continue
pid, ppid, pgid, etimes, pcpu, pmem, comm = parts[:7]
args = parts[7] if len(parts) > 7 else comm
try:
rows.append(
{
"pid": int(pid),
"ppid": int(ppid),
"pgid": int(pgid),
"etimes": int(float(etimes)),
"cpu_percent": float(pcpu),
"mem_percent": float(pmem),
"comm": Path(comm).name[:48],
"family": _classify_process_family(comm, args),
}
)
except ValueError:
continue
return rows
def _summarize_processes(rows: list[dict[str, Any]], *, top_n: int) -> dict[str, Any]:
top_rows = sorted(rows, key=lambda item: (-item["cpu_percent"], item["comm"], item["pid"]))[:top_n]
families: dict[str, dict[str, Any]] = {}
for row in rows:
family = row["family"]
current = families.setdefault(
family,
{
"family": family,
"process_count": 0,
"cpu_percent": 0.0,
"max_age_seconds": 0,
"sample_comm": "",
},
)
current["process_count"] += 1
current["cpu_percent"] += row["cpu_percent"]
current["max_age_seconds"] = max(current["max_age_seconds"], row["etimes"])
if not current["sample_comm"] or row["cpu_percent"] > current.get("_sample_cpu", -1):
current["sample_comm"] = row["comm"]
current["_sample_cpu"] = row["cpu_percent"]
family_rows = []
for item in families.values():
item.pop("_sample_cpu", None)
item["cpu_percent"] = round(float(item["cpu_percent"]), 3)
family_rows.append(item)
return {
"top_processes": [
{
"pid": row["pid"],
"ppid": row["ppid"],
"pgid": row["pgid"],
"cpu_percent": round(row["cpu_percent"], 3),
"mem_percent": round(row["mem_percent"], 3),
"age_seconds": row["etimes"],
"comm": row["comm"],
"family": row["family"],
}
for row in top_rows
],
"families": sorted(family_rows, key=lambda item: (-item["cpu_percent"], item["family"]))[
:top_n
],
}
def _family_cpu(process_summary: dict[str, Any], *families: str) -> float:
return sum(
float(item.get("cpu_percent") or 0.0)
for item in process_summary.get("families", [])
if item.get("family") in families
)
def _top_family(process_summary: dict[str, Any]) -> dict[str, Any] | None:
families = process_summary.get("families", [])
return families[0] if families else None
def build_packet(
*,
host: str,
samples: list[dict[str, Any]],
docker_samples: list[dict[str, Any]],
docker_stats_status: dict[str, Any],
process_summary: dict[str, Any],
load5_per_core_threshold: float,
container_cpu_threshold: float,
hot_container_cpu_threshold: float,
process_family_cpu_threshold: float,
ci_stale_age_seconds: int,
script_dir: Path = DEFAULT_SCRIPT_DIR,
) -> dict[str, Any]:
monitor_up = int(
_sample_value(
samples,
"awoooi_host_runaway_process_monitor_up",
host=host,
labels={"mode": "read_only"},
default=0,
)
)
load5_per_core = _sample_value(samples, "awoooi_host_load5_per_core", host=host)
swap_used_ratio = _sample_value(samples, "awoooi_host_swap_used_ratio", host=host)
remediation_authorized = int(
_sample_value(
samples,
"awoooi_host_runaway_process_remediation_authorized",
host=host,
)
)
active_ci_containers = int(
_sample_value(
samples,
"awoooi_host_gitea_actions_active_container_count",
host=host,
default=0,
)
)
active_ci_groups = int(
_sample_value(
samples,
"awoooi_host_gitea_actions_active_process_group_count",
host=host,
default=0,
)
)
active_ci_cpu = _sample_value(
samples,
"awoooi_host_gitea_actions_active_process_cpu_percent",
host=host,
)
active_ci_oldest_age = int(
_sample_value(
samples,
"awoooi_host_gitea_actions_active_process_oldest_age_seconds",
host=host,
)
)
top_orphan = _top_orphan_rule(samples, host=host)
raw_top_container = _top_container_cpu(docker_samples, host=host)
top_container = raw_top_container if docker_stats_status.get("fresh") is True else None
top_container_name = str((top_container or {}).get("container_name") or "").lower()
top_container_cpu = float((top_container or {}).get("cpu_cores") or 0.0)
stock_process_cpu = _family_cpu(process_summary, "postgres")
gitea_process_cpu = _family_cpu(process_summary, "gitea_service")
control_plane_cpu = _family_cpu(process_summary, "systemd_control_plane", "ssh_control_plane")
top_family = _top_family(process_summary)
classification = "observing_load_within_threshold"
severity = "info"
controlled_apply_allowed = False
next_action = "keep_read_only_monitoring"
dry_run_command = ""
controlled_apply_command = ""
controller_script = script_dir / "host-sustained-load-controller.py"
evidence_script = script_dir / "host-sustained-load-evidence.py"
remediation_script = script_dir / "host-runaway-process-remediation.py"
verifier_command = (
f"{controller_script} "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE}"
)
if monitor_up != 1:
classification = "blocked_monitor_unavailable"
severity = "warning"
next_action = "restore_host_runaway_process_exporter_textfile_before_apply"
elif remediation_authorized > 0:
classification = "blocked_monitor_authority_violation"
severity = "critical"
next_action = "rollback_monitor_to_read_only_exporter"
elif load5_per_core > load5_per_core_threshold and top_orphan:
classification = "controlled_orphan_browser_remediation_ready"
severity = "critical"
controlled_apply_allowed = True
rule = top_orphan["rule"]
dry_run_command = f"{remediation_script} --rule {rule}"
controlled_apply_command = (
f"{remediation_script} "
f"--rule {rule} --apply --confirm-apply "
"--controlled-apply-id ${CONTROLLED_APPLY_ID} "
"--evidence-ref ${EVIDENCE_REF} "
"--post-apply-verifier "
f"'{controller_script} --host "
f"{host} --metrics-file {DEFAULT_METRICS_FILE}' "
"--wait-seconds 10"
)
next_action = "run_orphan_browser_remediation_dry_run_then_controlled_sigterm"
elif (
load5_per_core > load5_per_core_threshold
and (active_ci_containers > 0 or active_ci_groups > 0)
):
classification = "controlled_ci_runner_saturation_guarded"
severity = "critical" if active_ci_oldest_age >= ci_stale_age_seconds else "warning"
controlled_apply_allowed = active_ci_oldest_age >= ci_stale_age_seconds
dry_run_command = (
"ops/runner/read-public-gitea-actions-queue.py --json "
"&& ops/runner/check-awoooi-non110-runner-readiness.sh"
)
controlled_apply_command = (
"keep_110_runner_pressure_gate_fail_closed; "
"only cancel/drain stale Gitea Actions through runner verifier packet"
)
next_action = (
"prepare_runner_drain_or_cancel_packet_without_process_kill"
if controlled_apply_allowed
else "keep_pressure_gate_fail_closed_until_ci_load_clears"
)
elif (
(
"stockplatform-v2-postgres-1" in top_container_name
or top_container_name == "stockplatform-v2-api-1"
)
and top_container_cpu >= container_cpu_threshold
) or stock_process_cpu >= process_family_cpu_threshold * 2:
classification = "blocked_stockplatform_hot_query_or_api_pressure_requires_playbook"
severity = (
"critical"
if load5_per_core > load5_per_core_threshold
or stock_process_cpu >= process_family_cpu_threshold * 2
else "warning"
)
dry_run_command = (
f"{evidence_script} "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json"
)
next_action = "run_stockplatform_hot_query_or_api_pressure_playbook_check_mode"
elif (
top_container_name == "gitea" and top_container_cpu >= container_cpu_threshold
) or gitea_process_cpu >= process_family_cpu_threshold * 2:
classification = "blocked_gitea_queue_or_hook_backlog_requires_playbook"
severity = (
"critical"
if load5_per_core > load5_per_core_threshold
or gitea_process_cpu >= process_family_cpu_threshold * 2
else "warning"
)
dry_run_command = (
f"{evidence_script} "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json"
)
next_action = "run_gitea_queue_or_hook_backlog_playbook_check_mode"
elif control_plane_cpu >= process_family_cpu_threshold:
classification = "blocked_control_plane_saturation_requires_playbook"
severity = "critical" if load5_per_core > load5_per_core_threshold else "warning"
dry_run_command = (
f"{evidence_script} "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json"
)
next_action = "run_control_plane_saturation_playbook_check_mode"
elif (
"stockplatform-v2-postgres-1" in top_container_name
and top_container_cpu >= hot_container_cpu_threshold
) or (
top_container_name == "stockplatform-v2-api-1"
and top_container_cpu >= hot_container_cpu_threshold
) or stock_process_cpu >= process_family_cpu_threshold:
classification = "blocked_stockplatform_hot_query_or_api_pressure_requires_playbook"
severity = "critical" if load5_per_core > load5_per_core_threshold else "warning"
dry_run_command = (
f"{evidence_script} "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json"
)
next_action = "run_stockplatform_hot_query_or_api_pressure_playbook_check_mode"
elif (
top_container_name == "gitea" and top_container_cpu >= hot_container_cpu_threshold
) or gitea_process_cpu >= process_family_cpu_threshold:
classification = "blocked_gitea_queue_or_hook_backlog_requires_playbook"
severity = "critical" if load5_per_core > load5_per_core_threshold else "warning"
dry_run_command = (
f"{evidence_script} "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json"
)
next_action = "run_gitea_queue_or_hook_backlog_playbook_check_mode"
elif load5_per_core > load5_per_core_threshold and swap_used_ratio >= 0.85:
classification = "blocked_memory_or_swap_pressure_requires_service_playbook"
severity = "critical"
next_action = "route_to_service_specific_memory_pressure_playbook"
elif load5_per_core > load5_per_core_threshold:
classification = "blocked_unknown_sustained_load_requires_source_specific_playbook"
severity = "critical"
dry_run_command = (
f"{evidence_script} "
f"--host {host} --metrics-file {DEFAULT_METRICS_FILE} "
f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json"
)
next_action = "collect_sanitized_top_process_and_container_stats_then_select_playbook"
return {
"schema_version": SCHEMA_VERSION,
"host": host,
"mode": "read_only_control_packet",
"classification": classification,
"severity": severity,
"controlled_apply_allowed": controlled_apply_allowed,
"next_action": next_action,
"readback": {
"monitor_up": monitor_up,
"load5_per_core": round(load5_per_core, 6),
"load5_per_core_threshold": load5_per_core_threshold,
"container_cpu_threshold": container_cpu_threshold,
"hot_container_cpu_threshold": hot_container_cpu_threshold,
"process_family_cpu_threshold": process_family_cpu_threshold,
"script_dir": str(script_dir),
"swap_used_ratio": round(swap_used_ratio, 6),
"remediation_authorized": remediation_authorized,
"active_ci_container_count": active_ci_containers,
"active_ci_process_group_count": active_ci_groups,
"active_ci_process_cpu_percent": round(active_ci_cpu, 3),
"active_ci_oldest_age_seconds": active_ci_oldest_age,
"top_orphan_rule": top_orphan,
"top_container_cpu": top_container,
"top_container_cpu_untrusted": raw_top_container,
"docker_stats": docker_stats_status,
"top_process_family": top_family,
"stock_process_cpu_percent": round(stock_process_cpu, 3),
"gitea_process_cpu_percent": round(gitea_process_cpu, 3),
"control_plane_process_cpu_percent": round(control_plane_cpu, 3),
},
"commands": {
"dry_run": dry_run_command,
"controlled_apply": controlled_apply_command,
"post_apply_verifier": verifier_command,
"rollback": "send SIGTERM only; no persistent host mutation. Re-run workload if needed.",
},
"operation_boundaries": {
"secret_value_read": False,
"raw_session_read": False,
"raw_runner_registration_read": False,
"host_write_performed": False,
"process_signal_performed": False,
"docker_restart_allowed": False,
"systemd_restart_allowed": False,
"firewall_change_allowed": False,
"critical_break_glass_required": True,
},
"forbidden_actions": [
"SIGKILL",
"docker_restart",
"systemctl_restart",
"nginx_reload",
"firewall_change",
"kubectl_action",
"secret_read",
"legacy_or_generic_runner_restore",
],
}
def main() -> int:
args = parse_args()
try:
text = args.metrics_file.read_text(encoding="utf-8")
samples = parse_prometheus_text(text)
except FileNotFoundError:
samples = []
try:
docker_text = args.docker_stats_file.read_text(encoding="utf-8")
docker_samples = parse_prometheus_text(docker_text)
except FileNotFoundError:
docker_samples = []
packet = build_packet(
host=args.host,
samples=samples,
docker_samples=docker_samples,
docker_stats_status=docker_stats_freshness(
samples=samples,
docker_stats_file=args.docker_stats_file,
max_age_seconds=args.docker_stats_max_age_seconds,
),
process_summary=_summarize_processes(_parse_ps_text(_collect_ps_text(args.ps_file)), top_n=args.top_n),
load5_per_core_threshold=args.load5_per_core_threshold,
container_cpu_threshold=args.container_cpu_threshold,
hot_container_cpu_threshold=args.hot_container_cpu_threshold,
process_family_cpu_threshold=args.process_family_cpu_threshold,
ci_stale_age_seconds=args.ci_stale_age_seconds,
script_dir=args.script_dir,
)
if args.json:
print(json.dumps(packet, ensure_ascii=False, indent=2, sort_keys=True))
else:
print(f"status={packet['classification']}")
print(f"controlled_apply_allowed={str(packet['controlled_apply_allowed']).lower()}")
print(f"next_action={packet['next_action']}")
if packet["commands"]["dry_run"]:
print(f"dry_run_command={packet['commands']['dry_run']}")
if packet["commands"]["controlled_apply"]:
print(f"controlled_apply_command={packet['commands']['controlled_apply']}")
print(f"post_apply_verifier={packet['commands']['post_apply_verifier']}")
return 0 if not packet["classification"].startswith("blocked_") else 75
if __name__ == "__main__":
raise SystemExit(main())