#!/usr/bin/env python3 """Read-only check-mode playbook for 110 Gitea CPU pressure. This helper turns the controller's generic Gitea pressure branch into a source-specific packet. It only reads public Gitea health/version/metrics and sanitized host textfiles; it does not read secrets, app.ini, raw runner registrations, sessions, or environment files, and it does not mutate host state. """ from __future__ import annotations import argparse import json import re import time import urllib.error import urllib.request from pathlib import Path from typing import Any DEFAULT_HOST_METRICS_FILE = Path("/home/wooo/node_exporter_textfiles/host_runaway_process.prom") DEFAULT_DOCKER_STATS_FILE = Path("/home/wooo/node_exporter_textfiles/docker_stats.prom") DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS = 300 DEFAULT_GITEA_METRICS_URL = "http://192.168.0.110:3001/metrics" DEFAULT_GITEA_HEALTH_URL = "http://192.168.0.110:3001/api/healthz" DEFAULT_GITEA_VERSION_URL = "http://192.168.0.110:3001/api/v1/version" SCHEMA_VERSION = "gitea_queue_hook_backlog_check_mode_v1" LABEL_RE = re.compile(r"(?P[A-Za-z_][A-Za-z0-9_]*)=\"(?P(?:[^\"\\\\]|\\\\.)*)\"") METRIC_RE = re.compile( r"^(?P[A-Za-z_:][A-Za-z0-9_:]*)(?:\{(?P[^}]*)\})?\s+" r"(?P[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)$" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Build a read-only Gitea queue/hook backlog check-mode packet." ) parser.add_argument("--host", default="110") parser.add_argument("--metrics-file", type=Path, default=DEFAULT_HOST_METRICS_FILE) parser.add_argument("--docker-stats-file", type=Path, default=DEFAULT_DOCKER_STATS_FILE) parser.add_argument( "--docker-stats-max-age-seconds", type=int, default=DEFAULT_DOCKER_STATS_MAX_AGE_SECONDS, ) parser.add_argument("--gitea-metrics-url", default=DEFAULT_GITEA_METRICS_URL) parser.add_argument("--gitea-health-url", default=DEFAULT_GITEA_HEALTH_URL) parser.add_argument("--gitea-version-url", default=DEFAULT_GITEA_VERSION_URL) parser.add_argument("--gitea-metrics-file", type=Path) parser.add_argument("--gitea-health-file", type=Path) parser.add_argument("--gitea-version-file", type=Path) parser.add_argument("--queue-json-file", type=Path) parser.add_argument("--http-timeout-seconds", type=float, default=5.0) parser.add_argument("--hot-container-cpu-threshold", type=float, default=1.0) parser.add_argument("--gitea-family-cpu-threshold", type=float, default=50.0) parser.add_argument("--hooktasks-warning-threshold", type=float, default=1000.0) parser.add_argument("--json", action="store_true") return parser.parse_args() def _unescape_label(value: str) -> str: return value.replace(r"\"", '"').replace(r"\\", "\\").replace(r"\n", "\n") def parse_prometheus_text(text: str) -> list[dict[str, Any]]: samples: list[dict[str, Any]] = [] for raw_line in text.splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue match = METRIC_RE.match(line) if not match: continue labels = { item.group("key"): _unescape_label(item.group("value")) for item in LABEL_RE.finditer(match.group("labels") or "") } samples.append( { "name": match.group("name"), "labels": labels, "value": float(match.group("value")), } ) return samples def read_text(path: Path | None) -> str: if path is None: return "" try: return path.read_text(encoding="utf-8") except FileNotFoundError: return "" def _sample_value_any(samples: list[dict[str, Any]], name: str) -> float | None: for sample in samples: if sample["name"] == name: return float(sample["value"]) return None def _sample_value( samples: list[dict[str, Any]], name: str, *, host: str, labels: dict[str, str] | None = None, default: float = 0.0, ) -> float: expected = {"host": host, **(labels or {})} for sample in samples: if sample["name"] != name: continue sample_labels = sample["labels"] if all(sample_labels.get(key) == value for key, value in expected.items()): return float(sample["value"]) return default def _textfile_mtime_seconds(samples: list[dict[str, Any]], suffix: str) -> float | None: for sample in samples: if sample["name"] != "node_textfile_mtime_seconds": continue file_label = str(sample["labels"].get("file") or "") if file_label.endswith(suffix): return float(sample["value"]) return None def docker_stats_freshness( *, samples: list[dict[str, Any]], docker_stats_file: Path, max_age_seconds: int, ) -> dict[str, Any]: mtime = _textfile_mtime_seconds(samples, "docker_stats.prom") now = _sample_value_any(samples, "node_time_seconds") source = "node_textfile_mtime_seconds" if mtime is None: try: mtime = docker_stats_file.stat().st_mtime now = time.time() source = "file_stat_mtime" except FileNotFoundError: return { "fresh": False, "age_seconds": None, "max_age_seconds": max_age_seconds, "source": "missing", } if now is None: now = time.time() age_seconds = max(0, int(now - mtime)) return { "fresh": age_seconds <= max_age_seconds, "age_seconds": age_seconds, "max_age_seconds": max_age_seconds, "source": source, } def top_docker_containers( samples: list[dict[str, Any]], *, host: str, top_n: int = 5, ) -> list[dict[str, Any]]: rows = [] for sample in samples: if sample["name"] != "docker_container_cpu_cores": continue labels = sample["labels"] if labels.get("host", host) != host: continue rows.append( { "container_name": labels.get("container_name") or labels.get("name") or "unknown", "cpu_cores": round(float(sample["value"]), 6), } ) return sorted(rows, key=lambda item: (-item["cpu_cores"], item["container_name"]))[:top_n] def process_families(samples: list[dict[str, Any]], *, host: str) -> list[dict[str, Any]]: by_family: dict[str, dict[str, Any]] = {} for sample in samples: labels = sample["labels"] if labels.get("host") != host: continue family = labels.get("family") if not family: continue row = by_family.setdefault( family, { "family": family, "cpu_percent": 0.0, "process_count": 0, "oldest_age_seconds": 0, "top_info": "", }, ) if sample["name"] == "awoooi_host_process_family_cpu_percent": row["cpu_percent"] = round(float(sample["value"]), 3) elif sample["name"] == "awoooi_host_process_family_process_count": row["process_count"] = int(sample["value"]) elif sample["name"] == "awoooi_host_process_family_oldest_age_seconds": row["oldest_age_seconds"] = int(sample["value"]) elif sample["name"] == "awoooi_host_process_family_top_info": row["top_info"] = str(labels.get("top_info") or "")[:120] return sorted(by_family.values(), key=lambda item: (-float(item["cpu_percent"]), item["family"])) def _family_cpu(families: list[dict[str, Any]], family: str) -> float: for item in families: if item.get("family") == family: return float(item.get("cpu_percent") or 0.0) return 0.0 def fetch_text_or_file(*, url: str, path: Path | None, timeout_seconds: float) -> dict[str, Any]: if path is not None: text = read_text(path) return { "ok": bool(text), "status_code": 200 if text else None, "source": str(path), "text": text, "error_type": "" if text else "file_missing_or_empty", } request = urllib.request.Request( url, headers={"User-Agent": "awoooi-gitea-pressure-check-mode/1.0"}, ) try: with urllib.request.urlopen(request, timeout=timeout_seconds) as response: raw = response.read() status = int(getattr(response, "status", 200)) return { "ok": 200 <= status < 300, "status_code": status, "source": url, "text": raw.decode("utf-8", errors="replace"), "error_type": "", } except urllib.error.HTTPError as exc: return { "ok": False, "status_code": int(exc.code), "source": url, "text": "", "error_type": "http_error", } except (urllib.error.URLError, TimeoutError): return { "ok": False, "status_code": None, "source": url, "text": "", "error_type": "connection_error", } def _json_from_text(text: str) -> dict[str, Any]: try: value = json.loads(text) except json.JSONDecodeError: return {} return value if isinstance(value, dict) else {} def selected_gitea_metrics(samples: list[dict[str, Any]]) -> dict[str, Any]: selected = { "gitea_hooktasks": _sample_value_any(samples, "gitea_hooktasks"), "gitea_repositories": _sample_value_any(samples, "gitea_repositories"), "gitea_webhooks": _sample_value_any(samples, "gitea_webhooks"), "go_goroutines": _sample_value_any(samples, "go_goroutines"), "go_sched_gomaxprocs_threads": _sample_value_any(samples, "go_sched_gomaxprocs_threads"), "process_cpu_seconds_total": _sample_value_any(samples, "process_cpu_seconds_total"), "process_open_fds": _sample_value_any(samples, "process_open_fds"), "process_resident_memory_bytes": _sample_value_any(samples, "process_resident_memory_bytes"), "gitea_build_version": "", } for sample in samples: if sample["name"] == "gitea_build_info": selected["gitea_build_version"] = str(sample["labels"].get("version") or "") break return selected def queue_readback_summary(path: Path | None) -> dict[str, Any]: if path is None: return {"available": False, "source": "", "latest_visible_cd_run": None} data = _json_from_text(read_text(path)) visible_runs = data.get("top_visible_runs") or data.get("visible_runs") or [] latest_cd_run = None if isinstance(visible_runs, list): for item in visible_runs: if not isinstance(item, dict): continue workflow = str(item.get("workflow") or item.get("name") or "") if workflow == "cd.yaml" or "cd" in workflow.lower(): latest_cd_run = { "workflow": workflow, "run_id": str(item.get("run_id") or ""), "status": str(item.get("status") or ""), "commit_sha": str(item.get("commit_sha") or "")[:12], } break return { "available": bool(data), "source": str(path), "top_visible_run_count": len(visible_runs) if isinstance(visible_runs, list) else 0, "latest_visible_cd_run": latest_cd_run, "no_matching_runner_visible": data.get("no_matching_runner_visible"), } def build_payload(args: argparse.Namespace) -> dict[str, Any]: host_samples = parse_prometheus_text(read_text(args.metrics_file)) docker_samples = parse_prometheus_text(read_text(args.docker_stats_file)) docker_status = docker_stats_freshness( samples=host_samples, docker_stats_file=args.docker_stats_file, max_age_seconds=args.docker_stats_max_age_seconds, ) metrics_read = fetch_text_or_file( url=args.gitea_metrics_url, path=args.gitea_metrics_file, timeout_seconds=args.http_timeout_seconds, ) health_read = fetch_text_or_file( url=args.gitea_health_url, path=args.gitea_health_file, timeout_seconds=args.http_timeout_seconds, ) version_read = fetch_text_or_file( url=args.gitea_version_url, path=args.gitea_version_file, timeout_seconds=args.http_timeout_seconds, ) gitea_samples = parse_prometheus_text(str(metrics_read.get("text") or "")) gitea_metrics = selected_gitea_metrics(gitea_samples) health_json = _json_from_text(str(health_read.get("text") or "")) version_json = _json_from_text(str(version_read.get("text") or "")) families = process_families(host_samples, host=args.host) containers_untrusted = top_docker_containers(docker_samples, host=args.host) containers = containers_untrusted if docker_status.get("fresh") is True else [] gitea_container_cpu = 0.0 for container in containers: if str(container.get("container_name") or "").lower() == "gitea": gitea_container_cpu = float(container.get("cpu_cores") or 0.0) break active_actions = { "container_count": int( _sample_value( host_samples, "awoooi_host_gitea_actions_active_container_count", host=args.host, ) ), "process_group_count": int( _sample_value( host_samples, "awoooi_host_gitea_actions_active_process_group_count", host=args.host, ) ), "process_cpu_percent": round( _sample_value( host_samples, "awoooi_host_gitea_actions_active_process_cpu_percent", host=args.host, ), 3, ), } gitea_family_cpu = _family_cpu(families, "gitea_service") hooktasks = float(gitea_metrics.get("gitea_hooktasks") or 0.0) health_status = str(health_json.get("status") or "") classification = "observing_gitea_pressure_below_threshold" severity = "info" next_action = "keep_read_only_monitoring" if docker_status.get("fresh") is not True: classification = "blocked_gitea_pressure_attribution_stale_requires_textfile_recovery" severity = "warning" next_action = "restore_docker_stats_textfile_before_gitea_apply" elif not metrics_read.get("ok"): classification = "blocked_gitea_metrics_unavailable_requires_route_or_exporter_check" severity = "warning" next_action = "restore_public_gitea_metrics_readback_before_apply" elif health_status and health_status != "pass": classification = "blocked_gitea_health_degraded_requires_service_recovery_playbook" severity = "critical" next_action = "run_gitea_service_health_recovery_check_mode_without_restart" elif active_actions["container_count"] > 0 or active_actions["process_group_count"] > 0: classification = "blocked_gitea_actions_pressure_requires_runner_queue_packet" severity = "warning" next_action = "run_runner_queue_readback_and_keep_110_pressure_gate_fail_closed" elif ( gitea_container_cpu >= args.hot_container_cpu_threshold and hooktasks >= args.hooktasks_warning_threshold ): classification = "blocked_gitea_hooktask_backlog_check_required" severity = "warning" next_action = "read_gitea_hooktask_backlog_age_from_authorized_export_before_apply" elif ( gitea_container_cpu >= args.hot_container_cpu_threshold or gitea_family_cpu >= args.gitea_family_cpu_threshold ): classification = "blocked_gitea_service_hot_without_actions_backlog" severity = "warning" next_action = "run_gitea_metrics_rate_probe_then_select_quota_or_hook_playbook" return { "schema_version": SCHEMA_VERSION, "host": args.host, "mode": "read_only_check_mode", "classification": classification, "severity": severity, "controlled_apply_allowed": False, "next_action": next_action, "readback": { "docker_stats": docker_status, "gitea_metrics_http": { "ok": bool(metrics_read.get("ok")), "status_code": metrics_read.get("status_code"), "source": metrics_read.get("source"), "error_type": metrics_read.get("error_type"), }, "gitea_health_http": { "ok": bool(health_read.get("ok")), "status_code": health_read.get("status_code"), "status": health_status, "checks": sorted((health_json.get("checks") or {}).keys()), }, "gitea_version_http": { "ok": bool(version_read.get("ok")), "status_code": version_read.get("status_code"), "version": str(version_json.get("version") or ""), }, "selected_gitea_metrics": gitea_metrics, "active_actions": active_actions, "gitea_container_cpu_cores": round(gitea_container_cpu, 6), "gitea_process_family_cpu_percent": round(gitea_family_cpu, 3), "top_containers": containers, "top_containers_untrusted": containers_untrusted, "top_process_families": families[:5], "queue_readback": queue_readback_summary(args.queue_json_file), "thresholds": { "hot_container_cpu": args.hot_container_cpu_threshold, "gitea_family_cpu": args.gitea_family_cpu_threshold, "hooktasks_warning": args.hooktasks_warning_threshold, }, }, "commands": { "check_mode": ( "/home/wooo/scripts/gitea-queue-hook-backlog-playbook.py " f"--host {args.host} --metrics-file {DEFAULT_HOST_METRICS_FILE} " f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json" ), "post_apply_verifier": ( "/home/wooo/scripts/host-sustained-load-controller.py " f"--host {args.host} --metrics-file {DEFAULT_HOST_METRICS_FILE} " f"--docker-stats-file {DEFAULT_DOCKER_STATS_FILE} --json" ), "controlled_apply": "", "rollback": "no host mutation performed by this check-mode playbook", }, "redaction": { "raw_metrics_emitted": False, "raw_command_lines_emitted": False, "workspace_paths_emitted": False, "urls_emitted_from_processes": False, "secret_values_read": False, }, "operation_boundaries": { "host_write_performed": False, "process_signal_performed": False, "docker_restart_performed": False, "systemd_restart_performed": False, "nginx_reload_performed": False, "database_query_performed": False, "raw_runner_registration_read": False, "raw_session_read": False, "secret_value_read": False, }, } def main() -> int: args = parse_args() payload = build_payload(args) if args.json: print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)) else: print(f"status={payload['classification']}") print(f"controlled_apply_allowed={str(payload['controlled_apply_allowed']).lower()}") print(f"next_action={payload['next_action']}") print(f"check_mode_command={payload['commands']['check_mode']}") print(f"post_apply_verifier={payload['commands']['post_apply_verifier']}") return 0 if not payload["classification"].startswith("blocked_") else 75 if __name__ == "__main__": raise SystemExit(main())