feat(recovery): add credential escrow intake scorecard [skip ci]
This commit is contained in:
@@ -1,3 +1,21 @@
|
||||
## 2026-06-29 — 09:36 credential escrow intake scorecard no-secret readback
|
||||
|
||||
**完成內容**:
|
||||
- 新增 `scripts/reboot-recovery/post-reboot-credential-escrow-intake-scorecard.py`,把同一輪 summary、owner packet、response template、offsite escrow report 與 marker status 收成 key/value / JSON scorecard。
|
||||
- scorecard 只讀 sanitized artifacts;不送 owner request、不讀 secret、不寫 credential marker、不啟動 runtime action。
|
||||
- 新增 `scripts/reboot-recovery/tests/test_post_reboot_credential_escrow_intake_scorecard.py`,覆蓋目前 credential-only gate 的 fail-closed 狀態,以及 stale `wazuh_manager_registry_export` response gate 會被判為 gate mismatch。
|
||||
|
||||
**驗證結果**:
|
||||
- 本地 focused tests:`python3.11 -m pytest scripts/reboot-recovery/tests/test_post_reboot_credential_escrow_intake_scorecard.py scripts/reboot-recovery/tests/test_post_reboot_owner_response_template.py scripts/reboot-recovery/tests/test_post_start_smoke_process_classifier.py scripts/reboot-recovery/tests/test_momo_source_arrival_gate.py -q`:`11 passed`。
|
||||
- `python3.11 -m py_compile scripts/reboot-recovery/post-reboot-credential-escrow-intake-scorecard.py scripts/reboot-recovery/post-reboot-owner-response-template.py scripts/reboot-recovery/post-reboot-owner-response-preflight.py`:通過。
|
||||
- `git diff --check`:通過。
|
||||
- 110 read-only artifacts:`/tmp/awoooi-offsite-escrow-evidence-report-20260629-credential-intake.txt` 與 `/tmp/awoooi-credential-escrow-status-20260629-credential-intake.txt`;讀回 `SCRIPT_MISSING_COUNT=0`、`OFFSITE_CONFIGURED=1`、`RCLONE_CONFIGURED=1`、`MISSING_ESCROW_MARKER_COUNT=5`、五個 escrow item 全部 `missing`。
|
||||
- scorecard artifact `/tmp/awoooi-credential-escrow-intake-scorecard-20260629.json`:`STATUS=blocked_waiting_non_secret_credential_escrow_evidence`、`ACTIVE_GATE_PRESENT=1`、`OWNER_PACKET_GATE_COUNT=1`、`RESPONSE_GATE_COUNT=1`、`UNEXPECTED_RESPONSE_GATE_COUNT=0`、`REQUIRED_ITEM_COUNT=5`、`EFFECTIVE_ESCROW_MISSING_COUNT=5`、`PREFLIGHT_STATUS=blocked_waiting_owner_response_content`、`OWNER_RESPONSE_RECEIVED_COUNT=0`、`OWNER_RESPONSE_ACCEPTED_COUNT=0`、`RUNTIME_GATE_COUNT=0`、`SECRET_VALUE_COLLECTION_ALLOWED=0`、`CREDENTIAL_MARKER_WRITE_AUTHORIZED_COUNT=0`、`FORBIDDEN_TRUE_FIELD_COUNT=0`。
|
||||
|
||||
**仍維持**:
|
||||
- `ESCROW_MISSING_COUNT=5`,不得宣稱 `DR_COMPLETE`。
|
||||
- 此段未讀、複製、貼上或外送 password / token / secret / `.runner` / raw session / SQLite / auth / `.env`;未寫 marker、未重啟 Docker / Nginx / firewall / K3s / DB、未使用 GitHub。
|
||||
|
||||
## 2026-06-29 — 09:30 Delivery Workbench queue verifier contract 投影
|
||||
|
||||
**完成內容**:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# AWOOOI 全棧冷啟動與主機重啟 SOP
|
||||
|
||||
> Version: v1.79
|
||||
> Version: v1.80
|
||||
> Last updated: 2026-06-29 Asia/Taipei
|
||||
> Scope: 110 / 120 / 121 / 188 full-stack reboot recovery. 112 Kali is recorded as P3 optional and is not part of this recovery path.
|
||||
|
||||
@@ -16,6 +16,8 @@ v1.76 owner gate replay rule:同一輪 summary 產生後,owner packet 與 ow
|
||||
|
||||
v1.79 active owner response template rule:同一輪 owner packet 產生後,placeholder response 必須由 `scripts/reboot-recovery/post-reboot-owner-response-template.py --owner-packet-file <owner-packets.json>` 生成,讓 `responses[].gate_id` 等於 active `owner_packets[].packet_id`。目前 2026-06-29 09:13 readback 只剩 `credential_escrow_evidence`,因此 generated template 不得帶入 `wazuh_manager_registry_export`。placeholder template 必須被 preflight 擋在 `blocked_waiting_owner_response_content`、`received=0`、`accepted=0`、`runtime_gate=0`;它是 no-secret intake aid,不是 owner accepted 或 marker-write 授權。
|
||||
|
||||
v1.80 credential escrow intake scorecard rule:同一輪 owner response preflight 後,必須用 `scripts/reboot-recovery/post-reboot-credential-escrow-intake-scorecard.py --summary-file "$ARTIFACT_DIR/summary.txt" --owner-packet-file <owner-packets.json> --response-file <owner-response-template-or-candidate.json> --offsite-report-file <offsite-report.txt> --escrow-status-file <escrow-status.txt>` 收斂 DR escrow gate。scorecard 只讀 sanitized artifacts;不得讀 secret value、不得寫 marker、不得送 owner request、不得開 runtime gate。2026-06-29 09:36 readback 期望 `STATUS=blocked_waiting_non_secret_credential_escrow_evidence`、`EFFECTIVE_ESCROW_MISSING_COUNT=5`、`OWNER_RESPONSE_RECEIVED_COUNT=0`、`OWNER_RESPONSE_ACCEPTED_COUNT=0`、`RUNTIME_GATE_COUNT=0`、`CREDENTIAL_MARKER_WRITE_AUTHORIZED_COUNT=0`。
|
||||
|
||||
2026-06-29 09:13 latest live summary:`scripts/reboot-recovery/post-reboot-readiness-summary.sh --no-color` artifact `/tmp/awoooi-post-reboot-readiness-20260629-091918/summary.txt` 回傳 `POST_START_RESULT=FULL_STACK_GREEN_DR_ESCROW_BLOCKED`、`POST_START_SERVICE_WARNINGS=0`、`SERVICE_GREEN=1`、`PRODUCT_DATA_GREEN=1`、`STOCK_FRESHNESS_STATUS=ok`、`STOCK_LATEST_TRADING_DATE=2026-06-26`、`BACKUP_CORE_GREEN=1`、`HOST_188_HYGIENE_BLOCKED=0`、`WAZUH_MANAGER_REGISTRY_ACCEPTED=6`、`RUNTIME_ACTION_AUTHORIZED=0`、`NEXT_REQUIRED_GATES=credential_escrow_evidence`。目前仍不可宣稱 `DR_COMPLETE`,因為 `ESCROW_MISSING_COUNT=5`;owner packet contract guard 期望 `gates=1`。
|
||||
|
||||
2026-06-27 11:51 最新 live revalidation:`scripts/reboot-recovery/post-reboot-readiness-summary.sh --no-color` artifact `/tmp/awoooi-post-reboot-readiness-20260627-115046/summary.txt` 回傳 `POST_START_RESULT=BLOCKED`、`POST_START_PASS=37`、`POST_START_WARN=3`、`POST_START_BLOCKED=2`、`SERVICE_GREEN=0`、`PRODUCT_DATA_GREEN=1`、`STOCK_FRESHNESS_STATUS=ok`、`STOCK_LATEST_TRADING_DATE=2026-06-26`、`STOCK_BLOCKERS=none`、`BACKUP_CORE_GREEN=1`、`HOST_188_HYGIENE_BLOCKED=0`、`WAZUH_MANAGER_REGISTRY_ACCEPTED=0`、`RUNTIME_ACTION_AUTHORIZED=0`。本輪已再次修復 188 `momo_pg_daily` crontab configured drift,`backup-status` 回 `core_blockers=0`、`configured_missing_188=0`;K3s / ArgoCD live readback 顯示 120 / 121 皆 `Ready`,`awoooi-prod` 為 `Synced / Healthy`,api/web/worker pods 均 Running。現在 hard blocker 是 MOMO business data freshness:`daily_sales_snapshot` 最新仍為 `2026-06-24`,`DRIVE_INTAKE_COUNT=0`,Drive archive / global latest `即時業績_當日` 均為 `2026-06-25T04:21:47Z`,最新 import job `57` 已 clean completed 且 `sync_success=true`。因此可宣稱主機、K3s、public routes、backup core 與 Stock freshness 已恢復;不可宣稱 full-stack green,直到 MOMO 來源檔補齊並由正式 import pipeline 更新 DB。DR complete 仍因 `ESCROW_MISSING_COUNT=5` 禁止宣稱,Wazuh 全主機納管仍因 manager registry accepted `0` 禁止宣稱。
|
||||
|
||||
@@ -120,9 +120,10 @@ scripts/reboot-recovery/post-reboot-owner-response-preflight.py --no-color --sum
|
||||
scripts/reboot-recovery/post-reboot-owner-response-preflight.py --no-color --owner-packet-file /tmp/awoooi-post-reboot-owner-packets.json
|
||||
scripts/reboot-recovery/post-reboot-owner-response-template.py --owner-packet-file /tmp/awoooi-post-reboot-owner-packets.json --output /tmp/awoooi-post-reboot-owner-response-template.json
|
||||
scripts/reboot-recovery/post-reboot-owner-response-preflight.py --no-color --owner-packet-file /tmp/awoooi-post-reboot-owner-packets.json --response-file /tmp/awoooi-post-reboot-owner-response-template.json
|
||||
scripts/reboot-recovery/post-reboot-credential-escrow-intake-scorecard.py --no-color --summary-file "$ARTIFACT_DIR/summary.txt" --owner-packet-file /tmp/awoooi-post-reboot-owner-packets.json --response-file /tmp/awoooi-post-reboot-owner-response-template.json --offsite-report-file /tmp/awoooi-offsite-escrow-evidence-report.txt --escrow-status-file /tmp/awoooi-credential-escrow-status.txt
|
||||
```
|
||||
|
||||
前兩個 preflight 命令必須輸出 `POST_REBOOT_OWNER_RESPONSE_PREFLIGHT_BLOCKED status=blocked_waiting_owner_response_file expected_gates=<live_next_gate_count> received=0 accepted=0 runtime_gate=0`。generator 只產生 active gate 的 placeholder JSON;目前 2026-06-29 readback 只應包含 `credential_escrow_evidence`,不得額外帶入 `wazuh_manager_registry_export`。把 placeholder template 送回 preflight 時,必須輸出 `status=blocked_waiting_owner_response_content`、`received=0`、`accepted=0`、`runtime_gate=0`,證明空模板不能被算成已收件或已接受。合格 response 只能包含 active gate 要求的脫敏 evidence refs、owner role / team / decision / reviewer / followup owner、五個 escrow item 的 non-secret evidence ref;若 Wazuh gate 未來重新 active,才納入 Wazuh manager registry / Dashboard API readback。不得包含密碼、token、secret value、hash、prefix/suffix、raw Wazuh payload、agent 原名、內網 IP、`client.keys`、active response、host write、agent re-enroll、Wazuh restart、Kali active scan 或 credential marker write。preflight 通過也只代表可進入獨立 reviewer acceptance,不代表 `DR_COMPLETE`、`WAZUH_REGISTRY_RECOVERED` 或任何 runtime action 授權。
|
||||
前兩個 preflight 命令必須輸出 `POST_REBOOT_OWNER_RESPONSE_PREFLIGHT_BLOCKED status=blocked_waiting_owner_response_file expected_gates=<live_next_gate_count> received=0 accepted=0 runtime_gate=0`。generator 只產生 active gate 的 placeholder JSON;目前 2026-06-29 readback 只應包含 `credential_escrow_evidence`,不得額外帶入 `wazuh_manager_registry_export`。把 placeholder template 送回 preflight 時,必須輸出 `status=blocked_waiting_owner_response_content`、`received=0`、`accepted=0`、`runtime_gate=0`,證明空模板不能被算成已收件或已接受。credential escrow scorecard 會把 summary、owner packet、response template、offsite report 與 marker status 收成同一份 key/value;目前應維持 `STATUS=blocked_waiting_non_secret_credential_escrow_evidence`、`EFFECTIVE_ESCROW_MISSING_COUNT=5`、`OWNER_RESPONSE_RECEIVED_COUNT=0`、`OWNER_RESPONSE_ACCEPTED_COUNT=0`、`RUNTIME_GATE_COUNT=0`、`CREDENTIAL_MARKER_WRITE_AUTHORIZED_COUNT=0`。合格 response 只能包含 active gate 要求的脫敏 evidence refs、owner role / team / decision / reviewer / followup owner、五個 escrow item 的 non-secret evidence ref;若 Wazuh gate 未來重新 active,才納入 Wazuh manager registry / Dashboard API readback。不得包含密碼、token、secret value、hash、prefix/suffix、raw Wazuh payload、agent 原名、內網 IP、`client.keys`、active response、host write、agent re-enroll、Wazuh restart、Kali active scan 或 credential marker write。preflight 通過也只代表可進入獨立 reviewer acceptance,不代表 `DR_COMPLETE`、`WAZUH_REGISTRY_RECOVERED` 或任何 runtime action 授權。
|
||||
|
||||
需要展開細節時,再使用 repo-side wrapper:
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
| P0 host / K3s recovery | DONE | 100% | 120 booted after console fsck at `2026-06-12 15:13`; latest 2026-06-26 07:19 readback shows 120 and 121 reachable, K3s active, `mon` and `mon1` both `Ready control-plane`, AWOOOI API/Web replicas split across both nodes, ArgoCD `awoooi-prod Synced / Healthy` at revision `1fd5e2a8b0f18d24eed16aa2a44286bcbf230603`, and `km-vectorize` official 03:00 台北時間 run succeeded with `lastSuccess=2026-06-25T19:00:14Z`. |
|
||||
| P1 backup / alert / escrow | BLOCKED_DR_ESCROW | 98% | 2026-06-27 00:56 backup readback shows 110 `13/13 fresh failed=0`, 188 `2/2 fresh failed=0`, `core_blockers=0`, `integrity_stale=0`, `offsite_fresh=1`, `rclone_gdrive_fresh=1`, `configured_missing_188=0`, `escrow_missing=5`, last aggregate `2026-06-26 02:31:02`。188 MOMO backup crontab drift 已修復並保留 rollback crontab。DR remains blocked on real non-secret credential escrow evidence IDs; do not write placeholder markers or paste secret values. |
|
||||
| P2 service / data truth | DONE | 100% | Public routes 與 service health 為綠燈,MOMO health `V10.719`,current-month parity 為 `15383|15383|2026-06-01|2026-06-24|2026-06-01|2026-06-24`。StockPlatform `/api/v1/system/freshness` 為 `ok`,latest trading date `2026-06-26`,blockers `none`;先前 Stock EOD blocker 已由官方來源與正式 cron 自然收斂。 |
|
||||
| P3 docs / automation contracts | DONE_WITH_BACKUP_CORE_RECOVERY_V179 | 100% | Workplan, SOP v1.79, post-reboot declaration guard, machine-readable post-reboot readiness summary with Wazuh registry detail fields and auto-persisted `summary.txt`, post-reboot next-gate dispatch checklist, owner-packet JSON generator, dynamic owner-packet contract guard, post-reboot owner response preflight, active-gate owner response template generator, one-page post-start quick check v1.18, route retry gate, delegated cold-start public-route / AWOOOI API warmup classifier, backup-status core-blocker readback, PyYAML-optional recovery-scorecard contract check, 188 MOMO backup crontab host-owned rollback evidence, deploy warmup classification, expanded public route list, StockPlatform freshness gate, StockPlatform cron-source recovery evidence, StockPlatform natural schedule green evidence, 110 orphan Chrome recurrence cleanup evidence, 188 fail-closed startup data recovery gate, 188 host hygiene read-only checklist, 188 PostgreSQL runtime-ready source-of-truth, 188 ACME route/timer hygiene, baseline `stockplatform_system_freshness_ok`, BACKUP-STATUS, LOGBOOK, 120 console/fsck recovery, Gitea backup stale-dump hardening, reboot ledger/version-comparison SOP, escrow evidence audit, 188 nginx Ansible baseline, 110 cold-start detector script, startup judgment layers, GO/NO-GO tree, host recovery cards, explicit Plan B degraded-operation path, machine-readable `plan_b` baseline, readiness-audit Plan B guard, B0-B5 service levels, T+0/T+120 fallback timeline checks, host role / load-balancing assessment, CD `known_hosts` guardrail, `fwupd-refresh.timer` rollback note, K3s filesystem event blocker, AWOOOI backup no-direct-offsite-sync contract, 110/188 Ansible source-of-truth, Gitea self-hosted readiness validation workflow, post-CD no-regression readbacks, stale-vs-active K8s failed Job classification, 110 runaway browser / CI load AIOps exporter + alert + gated remediation PlayBook, Telegram / AI event packet mapping, healthy heartbeat suppression, MOMO scheduler / current-month detector fix, exporter restore helpers, 110 Docker disk pressure cleanup boundary, notification-noise readback, MOMO import-boundary / Drive-auth fail-closed deploys, product version/readback matrix, and stricter product-data / route retry gates are updated. Declaration guard now machine-checks allowed / forbidden recovery statements from the same `summary.txt`: service/data/backup/188 host hygiene green may be declared when live summary says so, while `DR_COMPLETE`、`WAZUH_REGISTRY_RECOVERED` and `RUNTIME_ACTION_AUTHORIZED` remain forbidden until evidence gates close. |
|
||||
| P3 docs / automation contracts | DONE_WITH_BACKUP_CORE_RECOVERY_V180 | 100% | Workplan, SOP v1.80, post-reboot declaration guard, machine-readable post-reboot readiness summary with Wazuh registry detail fields and auto-persisted `summary.txt`, post-reboot next-gate dispatch checklist, owner-packet JSON generator, dynamic owner-packet contract guard, post-reboot owner response preflight, active-gate owner response template generator, credential escrow intake scorecard, one-page post-start quick check v1.18, route retry gate, delegated cold-start public-route / AWOOOI API warmup classifier, backup-status core-blocker readback, PyYAML-optional recovery-scorecard contract check, 188 MOMO backup crontab host-owned rollback evidence, deploy warmup classification, expanded public route list, StockPlatform freshness gate, StockPlatform cron-source recovery evidence, StockPlatform natural schedule green evidence, 110 orphan Chrome recurrence cleanup evidence, 188 fail-closed startup data recovery gate, 188 host hygiene read-only checklist, 188 PostgreSQL runtime-ready source-of-truth, 188 ACME route/timer hygiene, baseline `stockplatform_system_freshness_ok`, BACKUP-STATUS, LOGBOOK, 120 console/fsck recovery, Gitea backup stale-dump hardening, reboot ledger/version-comparison SOP, escrow evidence audit, 188 nginx Ansible baseline, 110 cold-start detector script, startup judgment layers, GO/NO-GO tree, host recovery cards, explicit Plan B degraded-operation path, machine-readable `plan_b` baseline, readiness-audit Plan B guard, B0-B5 service levels, T+0/T+120 fallback timeline checks, host role / load-balancing assessment, CD `known_hosts` guardrail, `fwupd-refresh.timer` rollback note, K3s filesystem event blocker, AWOOOI backup no-direct-offsite-sync contract, 110/188 Ansible source-of-truth, Gitea self-hosted readiness validation workflow, post-CD no-regression readbacks, stale-vs-active K8s failed Job classification, 110 runaway browser / CI load AIOps exporter + alert + gated remediation PlayBook, Telegram / AI event packet mapping, healthy heartbeat suppression, MOMO scheduler / current-month detector fix, exporter restore helpers, 110 Docker disk pressure cleanup boundary, notification-noise readback, MOMO import-boundary / Drive-auth fail-closed deploys, product version/readback matrix, and stricter product-data / route retry gates are updated. Declaration guard now machine-checks allowed / forbidden recovery statements from the same `summary.txt`: service/data/backup/188 host hygiene green may be declared when live summary says so, while `DR_COMPLETE`、`WAZUH_REGISTRY_RECOVERED` and `RUNTIME_ACTION_AUTHORIZED` remain forbidden until evidence gates close. |
|
||||
|
||||
2026-06-26 12:13 machine-readable summary baseline supersedes the 07:47 / 08:59 gate set: `scripts/reboot-recovery/post-reboot-readiness-summary.sh --no-color` stores delegated logs under `/tmp/awoooi-post-reboot-readiness-20260626-121303` and returns `SERVICE_GREEN=1`, `PRODUCT_DATA_GREEN=1`, `BACKUP_CORE_GREEN=1`, `DR_ESCROW_BLOCKED=1`, `ESCROW_MISSING_COUNT=5`, `HOST_188_SERVICE_GREEN=1`, `HOST_188_HYGIENE_BLOCKED=0`, `HOST_188_CHECK_RC=0`, `HOST_188_RESULT=HOST_188_HYGIENE_GREEN.`, `WAZUH_ROUTE_CODE=200`, `WAZUH_TRANSPORT_COUNT=6`, `WAZUH_COVERAGE_SCOPE=6`, `WAZUH_DIRECT_ACTIVE=2`, `WAZUH_NO_TRANSPORT=1`, `WAZUH_SSH_BLOCKED=3`, `WAZUH_DASHBOARD_API_CONNECTION=pending_or_spinning`, `WAZUH_DASHBOARD_INDEX_OK=3`, `WAZUH_MANAGER_REGISTRY_ACCEPTED=0`, `WAZUH_RUNTIME_GATE=0`, `RUNTIME_ACTION_AUTHORIZED=0`, `OVERALL_DECLARATION=FULL_STACK_GREEN_DR_ESCROW_BLOCKED`, and `NEXT_REQUIRED_GATES=credential_escrow_evidence,wazuh_manager_registry_export`. This is now the preferred first operator/AI-agent entrypoint after reboot because it separates service health from DR and security registry evidence; 188 host hygiene is no longer a next gate unless the live checklist regresses.
|
||||
|
||||
|
||||
425
scripts/reboot-recovery/post-reboot-credential-escrow-intake-scorecard.py
Executable file
425
scripts/reboot-recovery/post-reboot-credential-escrow-intake-scorecard.py
Executable file
@@ -0,0 +1,425 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Summarize credential escrow intake readiness without opening runtime gates.
|
||||
|
||||
Read-only by design. This script consumes sanitized artifacts such as the
|
||||
post-reboot summary, owner packet, placeholder/owner response, offsite report,
|
||||
and escrow marker status. It never reads secret values, writes credential
|
||||
markers, sends owner requests, or modifies host/runtime state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
OWNER_PACKET_GENERATOR = (
|
||||
ROOT / "scripts" / "reboot-recovery" / "post-reboot-next-gate-owner-packets.py"
|
||||
)
|
||||
OWNER_RESPONSE_PREFLIGHT = (
|
||||
ROOT / "scripts" / "reboot-recovery" / "post-reboot-owner-response-preflight.py"
|
||||
)
|
||||
|
||||
EXPECTED_OWNER_PACKET_SCHEMA = "awoooi_post_reboot_next_gate_owner_packets_v1"
|
||||
RESPONSE_SCHEMA = "awoooi_post_reboot_next_gate_owner_response_v1"
|
||||
ESCROW_GATE_ID = "credential_escrow_evidence"
|
||||
ESCROW_ITEM_IDS = {
|
||||
"restic_repository_password",
|
||||
"offsite_provider_credentials",
|
||||
"break_glass_admin_credentials",
|
||||
"dns_registrar_recovery",
|
||||
"oauth_ai_provider_recovery",
|
||||
}
|
||||
FORBIDDEN_TRUE_FIELDS = {
|
||||
"runtime_action_requested",
|
||||
"runtime_action_authorized",
|
||||
"host_write_requested",
|
||||
"host_write_authorized",
|
||||
"secret_value_included",
|
||||
"secret_value_collection_allowed",
|
||||
"credential_marker_write_requested",
|
||||
"credential_marker_write_authorized",
|
||||
}
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Read credential escrow intake artifacts into a no-secret scorecard.",
|
||||
)
|
||||
parser.add_argument("--summary-file", type=Path, help="Post-reboot readiness summary.")
|
||||
parser.add_argument("--owner-packet-file", type=Path, help="Post-reboot owner packet JSON.")
|
||||
parser.add_argument("--response-file", type=Path, help="Owner response or placeholder JSON.")
|
||||
parser.add_argument("--offsite-report-file", type=Path, help="offsite-escrow-evidence-report output.")
|
||||
parser.add_argument("--escrow-status-file", type=Path, help="mark-credential-escrow-verified --status output.")
|
||||
parser.add_argument("--json", action="store_true", help="Print machine-readable JSON.")
|
||||
parser.add_argument("--no-color", action="store_true", help="Accepted for command symmetry; output is plain text.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def read_text(path: Path | None) -> str:
|
||||
if not path:
|
||||
return ""
|
||||
try:
|
||||
return path.read_text(encoding="utf-8")
|
||||
except FileNotFoundError as exc:
|
||||
raise SystemExit(f"artifact_not_found={path}") from exc
|
||||
|
||||
|
||||
def load_json(path: Path, label: str) -> dict[str, Any]:
|
||||
try:
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
except FileNotFoundError as exc:
|
||||
raise SystemExit(f"{label}_not_found={path}") from exc
|
||||
except json.JSONDecodeError as exc:
|
||||
raise SystemExit(f"{label}_json_invalid={exc}") from exc
|
||||
if not isinstance(payload, dict):
|
||||
raise SystemExit(f"{label}_json_not_object")
|
||||
return payload
|
||||
|
||||
|
||||
def parse_key_values(text: str) -> dict[str, str]:
|
||||
values: dict[str, str] = {}
|
||||
for raw_line in text.splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or "=" not in line:
|
||||
continue
|
||||
key, value = line.split("=", 1)
|
||||
key = key.strip()
|
||||
if re.fullmatch(r"[A-Z0-9_]+", key):
|
||||
values[key] = value.strip()
|
||||
return values
|
||||
|
||||
|
||||
def split_csv(value: str | None) -> list[str]:
|
||||
if not value or value == "none":
|
||||
return []
|
||||
return [item.strip() for item in value.split(",") if item.strip()]
|
||||
|
||||
|
||||
def as_list(value: Any) -> list[Any]:
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, list):
|
||||
return value
|
||||
return [value]
|
||||
|
||||
|
||||
def as_int(value: Any) -> int | None:
|
||||
if value is None:
|
||||
return None
|
||||
try:
|
||||
return int(str(value))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def int_or_unknown(value: int | None) -> int | str:
|
||||
return value if value is not None else "unknown"
|
||||
|
||||
|
||||
def bool_as_int(value: Any) -> int:
|
||||
return 1 if value is True else 0
|
||||
|
||||
|
||||
def load_owner_packet(args: argparse.Namespace) -> dict[str, Any]:
|
||||
if args.owner_packet_file:
|
||||
return load_json(args.owner_packet_file, "owner_packet_file")
|
||||
if not args.summary_file:
|
||||
return {}
|
||||
cmd = [
|
||||
str(OWNER_PACKET_GENERATOR),
|
||||
"--no-color",
|
||||
"--summary-file",
|
||||
str(args.summary_file),
|
||||
]
|
||||
completed = subprocess.run(
|
||||
cmd,
|
||||
cwd=ROOT,
|
||||
check=False,
|
||||
text=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
if completed.returncode != 0:
|
||||
raise SystemExit(
|
||||
"owner_packet_generation_failed "
|
||||
f"rc={completed.returncode}\n{completed.stdout}"
|
||||
)
|
||||
try:
|
||||
packet = json.loads(completed.stdout)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise SystemExit(f"owner_packet_json_invalid={exc}") from exc
|
||||
if not isinstance(packet, dict):
|
||||
raise SystemExit("owner_packet_json_not_object")
|
||||
return packet
|
||||
|
||||
|
||||
def owner_packet_gate_ids(packet: dict[str, Any]) -> list[str]:
|
||||
if not packet:
|
||||
return []
|
||||
if packet.get("schema_version") != EXPECTED_OWNER_PACKET_SCHEMA:
|
||||
raise SystemExit(f"owner_packet_schema={packet.get('schema_version')!r}")
|
||||
return [
|
||||
str(item.get("packet_id"))
|
||||
for item in as_list(packet.get("owner_packets"))
|
||||
if isinstance(item, dict) and item.get("packet_id")
|
||||
]
|
||||
|
||||
|
||||
def credential_required_items(packet: dict[str, Any]) -> set[str]:
|
||||
for item in as_list(packet.get("owner_packets")):
|
||||
if not isinstance(item, dict) or item.get("packet_id") != ESCROW_GATE_ID:
|
||||
continue
|
||||
return {
|
||||
str(raw_item)
|
||||
for raw_item in as_list(item.get("required_items"))
|
||||
if str(raw_item) in ESCROW_ITEM_IDS
|
||||
}
|
||||
return set()
|
||||
|
||||
|
||||
def response_gate_ids(response: dict[str, Any]) -> list[str]:
|
||||
if not response:
|
||||
return []
|
||||
if response.get("schema_version") != RESPONSE_SCHEMA:
|
||||
raise SystemExit(f"response_schema={response.get('schema_version')!r}")
|
||||
return [
|
||||
str(item.get("gate_id"))
|
||||
for item in as_list(response.get("responses"))
|
||||
if isinstance(item, dict) and item.get("gate_id")
|
||||
]
|
||||
|
||||
|
||||
def count_true_fields(value: Any) -> dict[str, int]:
|
||||
counts = {key: 0 for key in FORBIDDEN_TRUE_FIELDS}
|
||||
if isinstance(value, dict):
|
||||
for key, child in value.items():
|
||||
if key in counts and child is not False:
|
||||
counts[key] += 1
|
||||
child_counts = count_true_fields(child)
|
||||
for child_key, child_value in child_counts.items():
|
||||
counts[child_key] += child_value
|
||||
elif isinstance(value, list):
|
||||
for child in value:
|
||||
child_counts = count_true_fields(child)
|
||||
for child_key, child_value in child_counts.items():
|
||||
counts[child_key] += child_value
|
||||
return counts
|
||||
|
||||
|
||||
def run_preflight(args: argparse.Namespace) -> dict[str, Any]:
|
||||
if not args.response_file:
|
||||
return {}
|
||||
cmd = [str(OWNER_RESPONSE_PREFLIGHT), "--json", "--no-color"]
|
||||
if args.owner_packet_file:
|
||||
cmd.extend(["--owner-packet-file", str(args.owner_packet_file)])
|
||||
elif args.summary_file:
|
||||
cmd.extend(["--summary-file", str(args.summary_file)])
|
||||
else:
|
||||
return {}
|
||||
cmd.extend(["--response-file", str(args.response_file)])
|
||||
completed = subprocess.run(
|
||||
cmd,
|
||||
cwd=ROOT,
|
||||
check=False,
|
||||
text=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
if completed.returncode != 0:
|
||||
raise SystemExit(
|
||||
"owner_response_preflight_failed "
|
||||
f"rc={completed.returncode}\n{completed.stdout}"
|
||||
)
|
||||
try:
|
||||
result = json.loads(completed.stdout)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise SystemExit(f"owner_response_preflight_json_invalid={exc}") from exc
|
||||
if not isinstance(result, dict):
|
||||
raise SystemExit("owner_response_preflight_json_not_object")
|
||||
return result
|
||||
|
||||
|
||||
def parse_escrow_status(text: str) -> dict[str, int]:
|
||||
seen = 0
|
||||
missing = 0
|
||||
present = 0
|
||||
for raw_line in text.splitlines():
|
||||
parts = raw_line.strip().split()
|
||||
if len(parts) < 2 or parts[0] not in ESCROW_ITEM_IDS:
|
||||
continue
|
||||
seen += 1
|
||||
state = parts[1].lower()
|
||||
if state == "missing":
|
||||
missing += 1
|
||||
elif state in {"present", "verified", "fresh", "ok"}:
|
||||
present += 1
|
||||
return {"seen": seen, "missing": missing, "present": present}
|
||||
|
||||
|
||||
def first_known(*values: int | None) -> int | None:
|
||||
for value in values:
|
||||
if value is not None:
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
def evaluate(args: argparse.Namespace) -> dict[str, Any]:
|
||||
summary = parse_key_values(read_text(args.summary_file))
|
||||
offsite = parse_key_values(read_text(args.offsite_report_file))
|
||||
escrow_status = parse_escrow_status(read_text(args.escrow_status_file))
|
||||
packet = load_owner_packet(args)
|
||||
response = load_json(args.response_file, "response_file") if args.response_file else {}
|
||||
preflight = run_preflight(args)
|
||||
|
||||
summary_gates = split_csv(summary.get("NEXT_REQUIRED_GATES"))
|
||||
packet_gates = owner_packet_gate_ids(packet)
|
||||
response_gates = response_gate_ids(response)
|
||||
unexpected_packet_gates = sorted(set(packet_gates) - {ESCROW_GATE_ID})
|
||||
unexpected_response_gates = sorted(set(response_gates) - set(packet_gates or summary_gates))
|
||||
|
||||
required_items = credential_required_items(packet)
|
||||
missing_required_items = sorted(ESCROW_ITEM_IDS - required_items) if ESCROW_GATE_ID in packet_gates else []
|
||||
|
||||
summary_missing = as_int(summary.get("ESCROW_MISSING_COUNT"))
|
||||
offsite_missing = first_known(
|
||||
as_int(offsite.get("MISSING_ESCROW_MARKER_COUNT")),
|
||||
as_int(offsite.get("ESCROW_MISSING_COUNT")),
|
||||
)
|
||||
status_missing = escrow_status["missing"] if escrow_status["seen"] else None
|
||||
effective_missing = first_known(offsite_missing, summary_missing, status_missing)
|
||||
|
||||
true_counts = count_true_fields(response)
|
||||
forbidden_true_total = sum(true_counts.values())
|
||||
preflight_status = str(preflight.get("status", "not_run"))
|
||||
preflight_blockers = as_list(preflight.get("blockers"))
|
||||
active_gate_present = ESCROW_GATE_ID in set(summary_gates or packet_gates)
|
||||
|
||||
if not active_gate_present:
|
||||
status = "not_required_current_summary"
|
||||
next_step = "rerun_post_reboot_summary_when_next_required_gates_change"
|
||||
elif forbidden_true_total:
|
||||
status = "blocked_forbidden_runtime_or_marker_request"
|
||||
next_step = "strip_runtime_secret_host_write_or_marker_write_fields_before_preflight"
|
||||
elif unexpected_packet_gates or unexpected_response_gates:
|
||||
status = "blocked_owner_packet_or_response_gate_mismatch"
|
||||
next_step = "regenerate_owner_packet_and_response_template_from_same_summary"
|
||||
elif preflight_status == "ready_for_independent_reviewer_acceptance" and effective_missing == 0:
|
||||
status = "ready_for_independent_reviewer_acceptance"
|
||||
next_step = "independent_reviewer_acceptance_then_marker_dry_run"
|
||||
else:
|
||||
status = "blocked_waiting_non_secret_credential_escrow_evidence"
|
||||
next_step = "collect_redacted_non_secret_evidence_refs_then_rerun_preflight"
|
||||
|
||||
result = {
|
||||
"schema_version": "awoooi_post_reboot_credential_escrow_intake_scorecard_v1",
|
||||
"status": status,
|
||||
"next_step": next_step,
|
||||
"active_gate_present": active_gate_present,
|
||||
"summary_next_required_gates": summary_gates,
|
||||
"owner_packet_gate_count": len(packet_gates),
|
||||
"owner_packet_gates": packet_gates,
|
||||
"unexpected_owner_packet_gate_count": len(unexpected_packet_gates),
|
||||
"unexpected_owner_packet_gates": unexpected_packet_gates,
|
||||
"response_gate_count": len(response_gates),
|
||||
"response_gates": response_gates,
|
||||
"unexpected_response_gate_count": len(unexpected_response_gates),
|
||||
"unexpected_response_gates": unexpected_response_gates,
|
||||
"required_item_count": len(required_items),
|
||||
"missing_required_item_count": len(missing_required_items),
|
||||
"missing_required_items": missing_required_items,
|
||||
"summary_escrow_missing_count": int_or_unknown(summary_missing),
|
||||
"offsite_escrow_missing_count": int_or_unknown(offsite_missing),
|
||||
"escrow_status_seen_count": escrow_status["seen"],
|
||||
"escrow_status_missing_count": int_or_unknown(status_missing),
|
||||
"effective_escrow_missing_count": int_or_unknown(effective_missing),
|
||||
"script_missing_count": int_or_unknown(as_int(offsite.get("SCRIPT_MISSING_COUNT"))),
|
||||
"offsite_configured": int_or_unknown(as_int(offsite.get("OFFSITE_CONFIGURED"))),
|
||||
"rclone_configured": int_or_unknown(as_int(offsite.get("RCLONE_CONFIGURED"))),
|
||||
"preflight_status": preflight_status,
|
||||
"preflight_blocker_count": len(preflight_blockers),
|
||||
"owner_response_received_count": preflight.get("owner_response_received_count", 0),
|
||||
"owner_response_accepted_count": preflight.get("owner_response_accepted_count", 0),
|
||||
"runtime_gate_count": preflight.get("runtime_gate_count", 0),
|
||||
"runtime_action_authorized": bool_as_int(preflight.get("runtime_action_authorized")),
|
||||
"host_write_authorized": bool_as_int(preflight.get("host_write_authorized")),
|
||||
"secret_value_collection_allowed": bool_as_int(preflight.get("secret_value_collection_allowed")),
|
||||
"runtime_action_requested_count": true_counts["runtime_action_requested"],
|
||||
"host_write_requested_count": true_counts["host_write_requested"],
|
||||
"secret_value_included_count": true_counts["secret_value_included"],
|
||||
"secret_value_collection_allowed_count": true_counts["secret_value_collection_allowed"],
|
||||
"credential_marker_write_requested_count": true_counts["credential_marker_write_requested"],
|
||||
"credential_marker_write_authorized_count": true_counts["credential_marker_write_authorized"],
|
||||
"forbidden_true_field_count": forbidden_true_total,
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
def csv_value(value: Any) -> str:
|
||||
if isinstance(value, list):
|
||||
return ",".join(str(item) for item in value) if value else "none"
|
||||
if isinstance(value, bool):
|
||||
return "1" if value else "0"
|
||||
return str(value)
|
||||
|
||||
|
||||
def print_key_values(result: dict[str, Any]) -> None:
|
||||
print("POST_REBOOT_CREDENTIAL_ESCROW_INTAKE_SCORECARD=1")
|
||||
ordered_keys = [
|
||||
"status",
|
||||
"next_step",
|
||||
"active_gate_present",
|
||||
"summary_next_required_gates",
|
||||
"owner_packet_gate_count",
|
||||
"owner_packet_gates",
|
||||
"unexpected_owner_packet_gate_count",
|
||||
"response_gate_count",
|
||||
"response_gates",
|
||||
"unexpected_response_gate_count",
|
||||
"required_item_count",
|
||||
"missing_required_item_count",
|
||||
"summary_escrow_missing_count",
|
||||
"offsite_escrow_missing_count",
|
||||
"escrow_status_seen_count",
|
||||
"escrow_status_missing_count",
|
||||
"effective_escrow_missing_count",
|
||||
"script_missing_count",
|
||||
"offsite_configured",
|
||||
"rclone_configured",
|
||||
"preflight_status",
|
||||
"preflight_blocker_count",
|
||||
"owner_response_received_count",
|
||||
"owner_response_accepted_count",
|
||||
"runtime_gate_count",
|
||||
"runtime_action_authorized",
|
||||
"host_write_authorized",
|
||||
"secret_value_collection_allowed",
|
||||
"runtime_action_requested_count",
|
||||
"host_write_requested_count",
|
||||
"secret_value_included_count",
|
||||
"credential_marker_write_requested_count",
|
||||
"credential_marker_write_authorized_count",
|
||||
"forbidden_true_field_count",
|
||||
]
|
||||
for key in ordered_keys:
|
||||
print(f"{key.upper()}={csv_value(result.get(key))}")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
result = evaluate(args)
|
||||
if args.json:
|
||||
print(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True))
|
||||
else:
|
||||
print_key_values(result)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,229 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[3]
|
||||
SCORECARD_SCRIPT = (
|
||||
ROOT / "scripts" / "reboot-recovery" / "post-reboot-credential-escrow-intake-scorecard.py"
|
||||
)
|
||||
TEMPLATE_SCRIPT = ROOT / "scripts" / "reboot-recovery" / "post-reboot-owner-response-template.py"
|
||||
|
||||
ESCROW_ITEMS = [
|
||||
"restic_repository_password",
|
||||
"offsite_provider_credentials",
|
||||
"break_glass_admin_credentials",
|
||||
"dns_registrar_recovery",
|
||||
"oauth_ai_provider_recovery",
|
||||
]
|
||||
|
||||
|
||||
def write_text(path: Path, text: str) -> Path:
|
||||
path.write_text(text, encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
def write_packet(tmp_path: Path, gate_ids: list[str]) -> Path:
|
||||
owner_packets = []
|
||||
for gate_id in gate_ids:
|
||||
owner_packets.append(
|
||||
{
|
||||
"packet_id": gate_id,
|
||||
"title": f"{gate_id} owner evidence",
|
||||
"priority": "P0",
|
||||
"required_items": ESCROW_ITEMS if gate_id == "credential_escrow_evidence" else [],
|
||||
}
|
||||
)
|
||||
packet_path = tmp_path / "owner-packets.json"
|
||||
packet_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"schema_version": "awoooi_post_reboot_next_gate_owner_packets_v1",
|
||||
"source": {"next_required_gates": gate_ids},
|
||||
"owner_packets": owner_packets,
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return packet_path
|
||||
|
||||
|
||||
def generate_template(packet_path: Path, tmp_path: Path) -> Path:
|
||||
response_path = tmp_path / "owner-response-template.json"
|
||||
subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
str(TEMPLATE_SCRIPT),
|
||||
"--owner-packet-file",
|
||||
str(packet_path),
|
||||
"--output",
|
||||
str(response_path),
|
||||
],
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=True,
|
||||
)
|
||||
return response_path
|
||||
|
||||
|
||||
def run_scorecard(
|
||||
summary_path: Path,
|
||||
packet_path: Path,
|
||||
response_path: Path,
|
||||
offsite_path: Path,
|
||||
escrow_status_path: Path,
|
||||
) -> dict:
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
str(SCORECARD_SCRIPT),
|
||||
"--summary-file",
|
||||
str(summary_path),
|
||||
"--owner-packet-file",
|
||||
str(packet_path),
|
||||
"--response-file",
|
||||
str(response_path),
|
||||
"--offsite-report-file",
|
||||
str(offsite_path),
|
||||
"--escrow-status-file",
|
||||
str(escrow_status_path),
|
||||
"--json",
|
||||
],
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=True,
|
||||
)
|
||||
return json.loads(result.stdout)
|
||||
|
||||
|
||||
def write_common_artifacts(tmp_path: Path) -> tuple[Path, Path, Path]:
|
||||
summary_path = write_text(
|
||||
tmp_path / "summary.txt",
|
||||
"""\
|
||||
SERVICE_GREEN=1
|
||||
NEXT_REQUIRED_GATES=credential_escrow_evidence
|
||||
ESCROW_MISSING_COUNT=5
|
||||
RUNTIME_ACTION_AUTHORIZED=0
|
||||
""",
|
||||
)
|
||||
offsite_path = write_text(
|
||||
tmp_path / "offsite-report.txt",
|
||||
"""\
|
||||
SCRIPT_MISSING_COUNT=0
|
||||
OFFSITE_CONFIGURED=1
|
||||
RCLONE_CONFIGURED=1
|
||||
MISSING_ESCROW_MARKER_COUNT=5
|
||||
ESCROW_MISSING_COUNT=5
|
||||
""",
|
||||
)
|
||||
escrow_status_path = write_text(
|
||||
tmp_path / "escrow-status.txt",
|
||||
"\n".join(f"{item} missing" for item in ESCROW_ITEMS) + "\n",
|
||||
)
|
||||
return summary_path, offsite_path, escrow_status_path
|
||||
|
||||
|
||||
def test_scorecard_blocks_until_non_secret_evidence_refs_exist(tmp_path: Path) -> None:
|
||||
summary_path, offsite_path, escrow_status_path = write_common_artifacts(tmp_path)
|
||||
packet_path = write_packet(tmp_path, ["credential_escrow_evidence"])
|
||||
response_path = generate_template(packet_path, tmp_path)
|
||||
|
||||
scorecard = run_scorecard(
|
||||
summary_path,
|
||||
packet_path,
|
||||
response_path,
|
||||
offsite_path,
|
||||
escrow_status_path,
|
||||
)
|
||||
|
||||
assert scorecard["status"] == "blocked_waiting_non_secret_credential_escrow_evidence"
|
||||
assert scorecard["active_gate_present"] is True
|
||||
assert scorecard["owner_packet_gates"] == ["credential_escrow_evidence"]
|
||||
assert scorecard["response_gates"] == ["credential_escrow_evidence"]
|
||||
assert scorecard["unexpected_response_gate_count"] == 0
|
||||
assert scorecard["required_item_count"] == 5
|
||||
assert scorecard["effective_escrow_missing_count"] == 5
|
||||
assert scorecard["script_missing_count"] == 0
|
||||
assert scorecard["offsite_configured"] == 1
|
||||
assert scorecard["rclone_configured"] == 1
|
||||
assert scorecard["preflight_status"] == "blocked_waiting_owner_response_content"
|
||||
assert scorecard["owner_response_received_count"] == 0
|
||||
assert scorecard["owner_response_accepted_count"] == 0
|
||||
assert scorecard["runtime_gate_count"] == 0
|
||||
assert scorecard["runtime_action_authorized"] == 0
|
||||
assert scorecard["host_write_authorized"] == 0
|
||||
assert scorecard["secret_value_collection_allowed"] == 0
|
||||
assert scorecard["credential_marker_write_requested_count"] == 0
|
||||
assert scorecard["credential_marker_write_authorized_count"] == 0
|
||||
|
||||
|
||||
def test_scorecard_blocks_forbidden_runtime_or_marker_requests(tmp_path: Path) -> None:
|
||||
summary_path, offsite_path, escrow_status_path = write_common_artifacts(tmp_path)
|
||||
packet_path = write_packet(tmp_path, ["credential_escrow_evidence"])
|
||||
response_path = generate_template(packet_path, tmp_path)
|
||||
response = json.loads(response_path.read_text(encoding="utf-8"))
|
||||
response["responses"][0]["runtime_action_requested"] = True
|
||||
response["responses"][0]["credential_marker_write_authorized"] = True
|
||||
response_path.write_text(json.dumps(response, indent=2) + "\n", encoding="utf-8")
|
||||
|
||||
scorecard = run_scorecard(
|
||||
summary_path,
|
||||
packet_path,
|
||||
response_path,
|
||||
offsite_path,
|
||||
escrow_status_path,
|
||||
)
|
||||
|
||||
assert scorecard["status"] == "blocked_forbidden_runtime_or_marker_request"
|
||||
assert scorecard["runtime_action_requested_count"] == 1
|
||||
assert scorecard["credential_marker_write_authorized_count"] == 1
|
||||
assert scorecard["forbidden_true_field_count"] == 2
|
||||
assert scorecard["owner_response_accepted_count"] == 0
|
||||
assert scorecard["runtime_gate_count"] == 0
|
||||
|
||||
|
||||
def test_scorecard_rejects_stale_extra_wazuh_response_gate(tmp_path: Path) -> None:
|
||||
summary_path, offsite_path, escrow_status_path = write_common_artifacts(tmp_path)
|
||||
packet_path = write_packet(tmp_path, ["credential_escrow_evidence"])
|
||||
response_path = generate_template(packet_path, tmp_path)
|
||||
response = json.loads(response_path.read_text(encoding="utf-8"))
|
||||
response["responses"].append(
|
||||
{
|
||||
"gate_id": "wazuh_manager_registry_export",
|
||||
"owner_role": "owner_role_here",
|
||||
"owner_team": "owner_team_here",
|
||||
"decision": "pending",
|
||||
"decision_reason": "decision_reason_here",
|
||||
"affected_scope": "stale Wazuh response should not be active",
|
||||
"redacted_evidence_refs": ["redacted_evidence_ref_here"],
|
||||
"followup_owner": "followup_owner_here",
|
||||
"runtime_action_requested": False,
|
||||
"host_write_requested": False,
|
||||
"secret_value_included": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
}
|
||||
)
|
||||
response_path.write_text(json.dumps(response, indent=2) + "\n", encoding="utf-8")
|
||||
|
||||
scorecard = run_scorecard(
|
||||
summary_path,
|
||||
packet_path,
|
||||
response_path,
|
||||
offsite_path,
|
||||
escrow_status_path,
|
||||
)
|
||||
|
||||
assert scorecard["status"] == "blocked_owner_packet_or_response_gate_mismatch"
|
||||
assert scorecard["response_gates"] == [
|
||||
"credential_escrow_evidence",
|
||||
"wazuh_manager_registry_export",
|
||||
]
|
||||
assert scorecard["unexpected_response_gate_count"] == 1
|
||||
assert scorecard["unexpected_response_gates"] == ["wazuh_manager_registry_export"]
|
||||
assert scorecard["owner_response_received_count"] == 0
|
||||
assert scorecard["runtime_gate_count"] == 0
|
||||
Reference in New Issue
Block a user