fix(recovery): surface host pressure in reboot slo
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled

This commit is contained in:
Your Name
2026-07-01 07:50:20 +08:00
parent f0c24dcd93
commit 46c2b8f8c4
3 changed files with 240 additions and 0 deletions

View File

@@ -38,6 +38,11 @@ def parse_args() -> argparse.Namespace:
type=Path,
help="Optional StockPlatform /api/v1/system/ingestion JSON readback.",
)
parser.add_argument(
"--host-pressure-file",
type=Path,
help="Optional host pressure JSON readback from Prometheus / node exporter.",
)
parser.add_argument("--generated-at", help="Override generated_at for stable snapshots.")
parser.add_argument(
"--required-host",
@@ -204,6 +209,14 @@ def source_controls() -> dict[str, bool]:
"RebootAutoRecoverySLOMissed",
"BackupCoverageDomainStale",
),
"conversation_event_hot_path_index_migration_source_present": file_contains(
source_file(
"apps/api/migrations/awooop_conversation_event_hot_path_indexes_2026-07-01.sql"
),
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_conv_event_recent",
"idx_awooop_conv_event_project_provider_recent",
"idx_awooop_conv_event_source_refs_event_ids_gin",
),
}
@@ -400,10 +413,118 @@ def build_stockplatform_readback(
}
def float_value(value: Any, default: float = 0.0) -> float:
try:
return float(str(value))
except (TypeError, ValueError):
return default
def normalize_top_containers(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
rows: list[dict[str, Any]] = []
for item in value:
if not isinstance(item, dict):
continue
rows.append(
{
"container_name": str(item.get("container_name") or item.get("name") or ""),
"cpu_cores": round(float_value(item.get("cpu_cores")), 4),
}
)
return rows
def build_host_pressure_readback(payload: dict[str, Any]) -> dict[str, Any]:
hosts = payload.get("hosts")
if not isinstance(hosts, list):
hosts = []
rows: list[dict[str, Any]] = []
blockers: list[str] = []
high_load_hosts: list[str] = []
gitea_pressure_hosts: list[str] = []
postgres_pressure_hosts: list[str] = []
for item in hosts:
if not isinstance(item, dict):
continue
host = str(item.get("host") or item.get("alias") or "")
if not host:
continue
cores = float_value(item.get("cores") or item.get("cpu_cores"), 0.0)
load5 = float_value(item.get("load5"), 0.0)
load5_per_core = float_value(item.get("load5_per_core"), 0.0)
if load5_per_core <= 0 and cores > 0:
load5_per_core = load5 / cores
top_containers = normalize_top_containers(item.get("top_containers"))
row = {
"host": host,
"load1": round(float_value(item.get("load1")), 4),
"load5": round(load5, 4),
"cores": round(cores, 4),
"load5_per_core": round(load5_per_core, 4),
"node_procs_running": int_value(item.get("node_procs_running")),
"node_procs_blocked": int_value(item.get("node_procs_blocked")),
"top_containers": top_containers[:5],
}
rows.append(row)
if load5_per_core > 1.0:
high_load_hosts.append(host)
if host == "110" and any(
container["container_name"] == "gitea" and container["cpu_cores"] >= 2.0
for container in top_containers
):
gitea_pressure_hosts.append(host)
if host == "188" and any(
container["container_name"] == "k3s-postgres-recovery"
and container["cpu_cores"] >= 4.0
for container in top_containers
):
postgres_pressure_hosts.append(host)
if high_load_hosts:
blockers.append("host_pressure_high_load")
if gitea_pressure_hosts:
blockers.append("host_110_gitea_cpu_pressure")
if postgres_pressure_hosts:
blockers.append("host_188_postgres_cpu_pressure")
if postgres_pressure_hosts and payload.get("conversation_event_hot_path_indexes_present") is False:
blockers.append("awooop_conversation_event_hot_path_index_drift_detected")
return {
"readback_present": bool(payload),
"host_count": len(rows),
"hosts": rows,
"high_load_hosts": high_load_hosts,
"gitea_pressure_hosts": gitea_pressure_hosts,
"postgres_pressure_hosts": postgres_pressure_hosts,
"conversation_event_hot_path_indexes_present": payload.get(
"conversation_event_hot_path_indexes_present"
),
"blockers": blockers,
"safe_actions": [
"keep_110_legacy_runner_failclosed",
"read_public_gitea_queue_metadata_only",
"apply_conversation_event_hot_path_indexes_via_controlled_db_migration",
"rerun_host_pressure_and_cold_start_scorecard_after_apply",
],
"forbidden_actions": [
"reboot_host_from_slo_lane",
"restart_gitea_or_postgres_without_break_glass",
"unmask_legacy_runner_or_restore_generic_labels",
"read_secret_or_runner_token",
"manual_db_update_or_destructive_restore",
],
}
def choose_safe_next_step(
*,
blockers: list[str],
stockplatform: dict[str, Any],
host_pressure: dict[str, Any],
) -> str:
freshness_status = str(stockplatform.get("freshness_status") or "unknown")
eod_window = stockplatform.get("eod_window") if isinstance(stockplatform.get("eod_window"), dict) else {}
@@ -426,6 +547,17 @@ def choose_safe_next_step(
"inspect_stockplatform_ingestion_readback_and_wait_retry_windows_then_"
"rerun_slo_verify_only_no_reboot"
)
pressure_blockers = set(strings(host_pressure.get("blockers")))
if "awooop_conversation_event_hot_path_index_drift_detected" in pressure_blockers:
return (
"apply_awooop_conversation_event_hot_path_indexes_via_controlled_db_"
"migration_then_rerun_188_postgres_cpu_readback_no_reboot"
)
if "host_110_gitea_cpu_pressure" in pressure_blockers:
return (
"keep_110_runner_failclosed_read_public_gitea_queue_and_recover_awoooi_host_"
"controlled_lane_only_after_verifier_no_generic_runner"
)
if blockers == ["host_boot_observation_older_than_target_window"]:
return (
"timer_deployed_and_services_readback_green_wait_for_next_all_host_reboot_"
@@ -450,6 +582,7 @@ def build_scorecard(args: argparse.Namespace) -> dict[str, Any]:
ingestion=read_json_object(args.stock_ingestion_file),
generated_at=generated_at,
)
host_pressure = build_host_pressure_readback(read_json_object(args.host_pressure_file))
controls = source_controls()
free_gib = disk_free_gib(args.disk_path)
@@ -529,6 +662,7 @@ def build_scorecard(args: argparse.Namespace) -> dict[str, Any]:
blockers.append("wazuh_dashboard_degraded")
if free_gib is not None and free_gib < args.min_free_gib:
blockers.append("local_disk_free_below_minimum")
blockers.extend(strings(host_pressure.get("blockers")))
max_uptime = max(
[int_value(row.get("uptime_seconds"), 0) for row in host_rows if row.get("reachable")]
@@ -539,6 +673,7 @@ def build_scorecard(args: argparse.Namespace) -> dict[str, Any]:
safe_next_step = choose_safe_next_step(
blockers=unique_blockers,
stockplatform=stockplatform,
host_pressure=host_pressure,
)
return {
"schema_version": SCHEMA_VERSION,
@@ -608,6 +743,7 @@ def build_scorecard(args: argparse.Namespace) -> dict[str, Any]:
"next_required_gates": summary.get("NEXT_REQUIRED_GATES", "unknown"),
},
"stockplatform_data_freshness": stockplatform,
"host_pressure": host_pressure,
"capacity": {
"checked": free_gib is not None,
"free_gib": round(free_gib, 3) if free_gib is not None else None,

View File

@@ -87,6 +87,41 @@ def run_scorecard(tmp_path: Path, summary: str, probe: str = HOST_PROBE_GREEN) -
return json.loads(result.stdout)
def run_scorecard_with_host_pressure(
tmp_path: Path,
summary: str,
host_pressure: dict,
) -> dict:
summary_path = tmp_path / "summary.txt"
probe_path = tmp_path / "probe.txt"
reboot_event_path = tmp_path / "reboot-event.json"
host_pressure_path = tmp_path / "host-pressure.json"
summary_path.write_text(summary, encoding="utf-8")
probe_path.write_text(HOST_PROBE_GREEN, encoding="utf-8")
reboot_event_path.write_text(json.dumps(REBOOT_EVENT_GREEN), encoding="utf-8")
host_pressure_path.write_text(json.dumps(host_pressure), encoding="utf-8")
result = subprocess.run(
[
sys.executable,
str(SCRIPT),
"--summary-file",
str(summary_path),
"--host-probe-file",
str(probe_path),
"--reboot-event-file",
str(reboot_event_path),
"--host-pressure-file",
str(host_pressure_path),
"--generated-at",
"2026-07-01T07:45:00+08:00",
],
text=True,
capture_output=True,
check=True,
)
return json.loads(result.stdout)
def run_scorecard_with_stock(
tmp_path: Path,
summary: str,
@@ -139,6 +174,9 @@ def test_green_summary_and_recent_all_host_probe_can_claim_slo(tmp_path: Path) -
] is True
assert payload["host_boot_detection"]["max_observed_uptime_seconds"] == 150
assert payload["active_blockers"] == []
assert payload["source_controls"][
"conversation_event_hot_path_index_migration_source_present"
] is True
def test_missing_probe_fails_closed(tmp_path: Path) -> None:
@@ -237,6 +275,58 @@ STOCK_EOD_FINAL_RETRY_WINDOW_END_LOCAL=23:35
)
def test_host_pressure_blocks_slo_with_index_drift_next_step(tmp_path: Path) -> None:
payload = run_scorecard_with_host_pressure(
tmp_path,
GREEN_SUMMARY,
{
"conversation_event_hot_path_indexes_present": False,
"hosts": [
{
"host": "110",
"load1": 20.74,
"load5": 18.05,
"cores": 12,
"node_procs_running": 63,
"node_procs_blocked": 0,
"top_containers": [
{"container_name": "gitea", "cpu_cores": 3.4019},
],
},
{
"host": "188",
"load1": 11.2,
"load5": 10.8,
"cores": 12,
"node_procs_running": 10,
"node_procs_blocked": 0,
"top_containers": [
{
"container_name": "k3s-postgres-recovery",
"cpu_cores": 8.5489,
},
],
},
],
},
)
assert payload["status"] == "blocked_reboot_auto_recovery_slo_not_ready"
assert payload["host_pressure"]["high_load_hosts"] == ["110"]
assert payload["host_pressure"]["gitea_pressure_hosts"] == ["110"]
assert payload["host_pressure"]["postgres_pressure_hosts"] == ["188"]
assert "host_110_gitea_cpu_pressure" in payload["active_blockers"]
assert "host_188_postgres_cpu_pressure" in payload["active_blockers"]
assert (
"awooop_conversation_event_hot_path_index_drift_detected"
in payload["active_blockers"]
)
assert payload["safe_next_step"] == (
"apply_awooop_conversation_event_hot_path_indexes_via_controlled_db_"
"migration_then_rerun_188_postgres_cpu_readback_no_reboot"
)
def test_stockplatform_recovered_marks_controlled_gate_not_required(
tmp_path: Path,
) -> None: