feat(iwooos): expose wazuh manager registry reviewer validation
Some checks failed
Code Review / ai-code-review (push) Successful in 16s
CD Pipeline / tests (push) Successful in 1m37s
CD Pipeline / build-and-deploy (push) Successful in 4m43s
CD Pipeline / post-deploy-checks (push) Has been cancelled
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
Some checks failed
Code Review / ai-code-review (push) Successful in 16s
CD Pipeline / tests (push) Successful in 1m37s
CD Pipeline / build-and-deploy (push) Successful in 4m43s
CD Pipeline / post-deploy-checks (push) Has been cancelled
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
This commit is contained in:
@@ -127,6 +127,10 @@ def validate(root: Path) -> None:
|
||||
str(root / "scripts" / "security" / "wazuh-managed-host-coverage-gate.py")
|
||||
)
|
||||
wazuh_managed_host_coverage_gate["validate"](root)
|
||||
wazuh_manager_registry_reviewer_validation = runpy.run_path(
|
||||
str(root / "scripts" / "security" / "wazuh-manager-registry-reviewer-validation.py")
|
||||
)
|
||||
wazuh_manager_registry_reviewer_validation["validate"](root)
|
||||
telegram_alert_readability_guard = runpy.run_path(
|
||||
str(root / "scripts" / "security" / "telegram-alert-readability-guard.py")
|
||||
)
|
||||
@@ -345,6 +349,15 @@ def validate(root: Path) -> None:
|
||||
iwooos_wazuh_managed_host_coverage_test = (
|
||||
root / "apps" / "api" / "tests" / "test_iwooos_wazuh_managed_host_coverage.py"
|
||||
).read_text(encoding="utf-8")
|
||||
iwooos_wazuh_manager_registry_reviewer_validation_service = (
|
||||
root / "apps" / "api" / "src" / "services" / "iwooos_wazuh_manager_registry_reviewer_validation.py"
|
||||
).read_text(encoding="utf-8")
|
||||
iwooos_wazuh_manager_registry_reviewer_validation_test = (
|
||||
root / "apps" / "api" / "tests" / "test_iwooos_wazuh_manager_registry_reviewer_validation.py"
|
||||
).read_text(encoding="utf-8")
|
||||
wazuh_manager_registry_reviewer_validation_script = (
|
||||
root / "scripts" / "security" / "wazuh-manager-registry-reviewer-validation.py"
|
||||
).read_text(encoding="utf-8")
|
||||
tenants_api_contract = (
|
||||
root / "apps" / "api" / "src" / "api" / "v1" / "platform" / "tenants.py"
|
||||
).read_text(encoding="utf-8")
|
||||
@@ -29529,6 +29542,9 @@ def validate(root: Path) -> None:
|
||||
iwooos_api_client,
|
||||
iwooos_wazuh_managed_host_coverage_service,
|
||||
iwooos_wazuh_managed_host_coverage_test,
|
||||
iwooos_wazuh_manager_registry_reviewer_validation_service,
|
||||
iwooos_wazuh_manager_registry_reviewer_validation_test,
|
||||
wazuh_manager_registry_reviewer_validation_script,
|
||||
]
|
||||
)
|
||||
for expected in [
|
||||
@@ -29551,6 +29567,16 @@ def validate(root: Path) -> None:
|
||||
"Wazuh 主機覆蓋只讀 API 已接上",
|
||||
"wazuh_managed_host_coverage_manager_registry_accepted_count=0",
|
||||
"wazuh_managed_host_coverage_runtime_gate_count=0",
|
||||
"iwooos-wazuh-manager-registry-reviewer-validation-board",
|
||||
"iwooos-wazuh-manager-registry-reviewer-validation-slots",
|
||||
"wazuhManagerRegistryReviewerValidation",
|
||||
"getIwoooSWazuhManagerRegistryReviewerValidation",
|
||||
"apiClient.getIwoooSWazuhManagerRegistryReviewerValidation",
|
||||
"Wazuh manager registry reviewer validation 已讀回",
|
||||
"wazuh_manager_registry_reviewer_validation_owner_registry_export_received_count=0",
|
||||
"wazuh_manager_registry_reviewer_validation_owner_registry_export_accepted_count=0",
|
||||
"wazuh_manager_registry_reviewer_validation_manager_registry_accepted_count=0",
|
||||
"wazuh_manager_registry_reviewer_validation_runtime_gate_count=0",
|
||||
]:
|
||||
assert_text_contains("iwooos_frontend_product_text.wazuh_managed_host_coverage", frontend_product_text, expected)
|
||||
for expected in [
|
||||
@@ -29565,6 +29591,14 @@ def validate(root: Path) -> None:
|
||||
"wazuh_managed_host_coverage_required_evidence_accepted_count=0",
|
||||
"wazuh_agent_reenroll_authorized=false",
|
||||
"wazuh_agent_restart_authorized=false",
|
||||
"/api/v1/iwooos/wazuh-manager-registry-reviewer-validation",
|
||||
"wazuh_manager_registry_reviewer_validation_v1",
|
||||
"iwooos_wazuh_manager_registry_reviewer_validation_readback_v1",
|
||||
"test_iwooos_wazuh_manager_registry_reviewer_validation_api_is_public_safe",
|
||||
"wazuh_manager_registry_reviewer_validation_owner_registry_export_received_count=0",
|
||||
"wazuh_manager_registry_reviewer_validation_owner_registry_export_accepted_count=0",
|
||||
"wazuh_manager_registry_reviewer_validation_manager_registry_accepted_count=0",
|
||||
"wazuh_manager_registry_reviewer_validation_runtime_gate_count=0",
|
||||
]:
|
||||
assert_text_contains(
|
||||
"iwooos_wazuh_managed_host_coverage_source",
|
||||
|
||||
458
scripts/security/wazuh-manager-registry-reviewer-validation.py
Normal file
458
scripts/security/wazuh-manager-registry-reviewer-validation.py
Normal file
@@ -0,0 +1,458 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Wazuh manager registry reviewer validation gate.
|
||||
|
||||
本工具只驗證 repo 內 committed snapshot;不查 Wazuh、不讀 host、不收
|
||||
secret、不保存 raw payload、不重新註冊 agent、不重啟服務,也不啟用
|
||||
active response。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
SNAPSHOT_PATH = Path("docs/security/wazuh-manager-registry-reviewer-validation.snapshot.json")
|
||||
SCHEMA_VERSION = "wazuh_manager_registry_reviewer_validation_v1"
|
||||
|
||||
EXPECTED_SCOPE_ALIASES = [
|
||||
"managed_core_node_a",
|
||||
"managed_core_node_b",
|
||||
"managed_dev_node_a",
|
||||
"managed_dev_node_b",
|
||||
"managed_control_node_a",
|
||||
"managed_control_node_b",
|
||||
]
|
||||
|
||||
REQUIRED_OWNER_FIELDS = [
|
||||
"owner_role",
|
||||
"team",
|
||||
"decision",
|
||||
"decision_reason",
|
||||
"affected_scope",
|
||||
"collection_method",
|
||||
"agent_total",
|
||||
"agent_active",
|
||||
"agent_disconnected",
|
||||
"agent_never_connected",
|
||||
"last_seen_window_start",
|
||||
"last_seen_window_end",
|
||||
"registry_collected_at",
|
||||
"registry_export_scope_aliases",
|
||||
"per_host_registry_matrix",
|
||||
"registry_gap_reason_by_alias",
|
||||
"registry_export_summary_ref",
|
||||
"manager_health_ref",
|
||||
"dashboard_api_status_ref",
|
||||
"dashboard_api_connection_check_status",
|
||||
"dashboard_api_version_check_status",
|
||||
"dashboard_index_pattern_statuses",
|
||||
"dashboard_api_degradation_root_cause",
|
||||
"dashboard_api_repair_postcheck_ref",
|
||||
"redacted_evidence_refs",
|
||||
"followup_owner",
|
||||
"rollback_owner",
|
||||
"postcheck_plan",
|
||||
]
|
||||
|
||||
PER_HOST_REQUIRED_FIELDS = [
|
||||
"node_alias",
|
||||
"scope_role",
|
||||
"registry_presence",
|
||||
"agent_status_bucket",
|
||||
"last_seen_state",
|
||||
"manager_group_ref",
|
||||
"agent_id_redacted_ref",
|
||||
"gap_reason",
|
||||
"redacted_evidence_ref",
|
||||
]
|
||||
|
||||
REVIEWER_VALIDATION_CHECKS = [
|
||||
{
|
||||
"check_id": "RV-01",
|
||||
"title": "Export envelope 欄位齊全",
|
||||
"required_evidence": "owner_role、team、decision、decision_reason、affected_scope、collection_method 與 redacted evidence refs 必須存在。",
|
||||
"failure_lane": "request_missing_fields",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-02",
|
||||
"title": "Registry counts 算術一致",
|
||||
"required_evidence": "agent_total 不得小於 active + disconnected + never_connected,且需有 registry_export_summary_ref。",
|
||||
"failure_lane": "request_counts_arithmetic_fix",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-03",
|
||||
"title": "Alias scope 與 IwoooS 覆蓋矩陣一致",
|
||||
"required_evidence": "registry_export_scope_aliases 必須剛好覆蓋 6 個公開別名,不得加入真實主機名或內網識別。",
|
||||
"failure_lane": "request_alias_scope_parity_fix",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-04",
|
||||
"title": "逐主機矩陣欄位完整",
|
||||
"required_evidence": "每個公開節點別名都要有 9 個 per-host 欄位與 gap reason。",
|
||||
"failure_lane": "request_per_host_matrix_supplement",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-05",
|
||||
"title": "Dashboard API 狀態不可用 index pattern 代替",
|
||||
"required_evidence": "API connection、API version、index pattern、degradation root cause 與 repair postcheck 需分欄。",
|
||||
"failure_lane": "request_dashboard_api_repair_postcheck",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-06",
|
||||
"title": "Manager health 與 readonly credential metadata 可追溯",
|
||||
"required_evidence": "manager_health_ref 與 readonly credential metadata 只能是脫敏來源資訊,不得含 secret value。",
|
||||
"failure_lane": "request_readonly_credential_metadata",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-07",
|
||||
"title": "Forbidden payload 一律隔離",
|
||||
"required_evidence": "不得含 raw Wazuh payload、完整 CLI output、未脫敏截圖、agent 原名、內網位址、token、密碼或 client key。",
|
||||
"failure_lane": "quarantine_sensitive_payload",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-08",
|
||||
"title": "Owner / followup / rollback 責任可讀",
|
||||
"required_evidence": "followup_owner、rollback_owner、維護窗口與 postcheck plan 必須能被 reviewer 追蹤。",
|
||||
"failure_lane": "request_owner_accountability_supplement",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-09",
|
||||
"title": "收件不等於 runtime 授權",
|
||||
"required_evidence": "owner decision 不可夾帶 active response、agent restart、reenroll、host write、Nginx、firewall、K8s 或 secret rotation。",
|
||||
"failure_lane": "reject_runtime_action_request",
|
||||
},
|
||||
{
|
||||
"check_id": "RV-10",
|
||||
"title": "Post-enable IwoooS readback 仍是下一關",
|
||||
"required_evidence": "即使 reviewer 未來接受 evidence,也只能進 read-only posture;必須另有 post-enable readback 才能更新 runtime truth。",
|
||||
"failure_lane": "waiting_post_enable_iwooos_readback",
|
||||
},
|
||||
]
|
||||
|
||||
OUTCOME_LANES = [
|
||||
"waiting_owner_registry_export",
|
||||
"request_missing_fields",
|
||||
"request_counts_arithmetic_fix",
|
||||
"request_alias_scope_parity_fix",
|
||||
"request_per_host_matrix_supplement",
|
||||
"request_dashboard_api_repair_postcheck",
|
||||
"request_readonly_credential_metadata",
|
||||
"request_owner_accountability_supplement",
|
||||
"quarantine_sensitive_payload",
|
||||
"reject_runtime_action_request",
|
||||
"ready_for_reviewer_validation",
|
||||
"accepted_for_readonly_posture_only",
|
||||
"waiting_post_enable_iwooos_readback",
|
||||
]
|
||||
|
||||
EVIDENCE_SLOTS = [
|
||||
{
|
||||
"slot_id": "manager_registry_agent_counts",
|
||||
"title": "Manager registry agent counts",
|
||||
"required_fields": ["agent_total", "agent_active", "agent_disconnected", "agent_never_connected", "registry_export_summary_ref"],
|
||||
},
|
||||
{
|
||||
"slot_id": "per_host_agent_scope_matrix",
|
||||
"title": "逐主機 agent scope matrix",
|
||||
"required_fields": ["registry_export_scope_aliases", "per_host_registry_matrix", "registry_gap_reason_by_alias"],
|
||||
},
|
||||
{
|
||||
"slot_id": "dashboard_api_rbac_tls_repair_readback",
|
||||
"title": "Dashboard API / RBAC / TLS 修復讀回",
|
||||
"required_fields": [
|
||||
"dashboard_api_connection_check_status",
|
||||
"dashboard_api_version_check_status",
|
||||
"dashboard_index_pattern_statuses",
|
||||
"dashboard_api_degradation_root_cause",
|
||||
"dashboard_api_repair_postcheck_ref",
|
||||
],
|
||||
},
|
||||
{
|
||||
"slot_id": "readonly_credential_metadata_without_secret",
|
||||
"title": "唯讀 credential metadata,不含 secret",
|
||||
"required_fields": ["collection_method", "manager_health_ref", "redacted_evidence_refs"],
|
||||
},
|
||||
{
|
||||
"slot_id": "owner_response_and_rollback_owner",
|
||||
"title": "Owner response / rollback owner",
|
||||
"required_fields": ["owner_role", "team", "decision", "decision_reason", "followup_owner", "rollback_owner"],
|
||||
},
|
||||
{
|
||||
"slot_id": "post_enable_iwooos_readback",
|
||||
"title": "Post-enable IwoooS readback",
|
||||
"required_fields": ["postcheck_plan", "redacted_evidence_refs"],
|
||||
},
|
||||
]
|
||||
|
||||
FORBIDDEN_PAYLOADS = [
|
||||
"raw_wazuh_payload",
|
||||
"raw_log",
|
||||
"full_journal",
|
||||
"full_cli_output",
|
||||
"unredacted_screenshot",
|
||||
"agent_name",
|
||||
"agent_id_plaintext",
|
||||
"internal_ip",
|
||||
"hostname",
|
||||
"authorization_header",
|
||||
"bearer_token",
|
||||
"basic_auth",
|
||||
"password",
|
||||
"token",
|
||||
"cookie",
|
||||
"private_key",
|
||||
"client_keys",
|
||||
"raw_dashboard_request",
|
||||
"dashboard_api_secret",
|
||||
"stored_api_password",
|
||||
"api_token",
|
||||
"active_response_enable",
|
||||
"agent_reenroll",
|
||||
"agent_restart",
|
||||
"host_write",
|
||||
"firewall_change",
|
||||
"nginx_reload",
|
||||
]
|
||||
|
||||
FORBIDDEN_ACTIONS = [
|
||||
"wazuh_api_live_query",
|
||||
"wazuh_agent_reenroll",
|
||||
"wazuh_agent_restart",
|
||||
"wazuh_manager_restart",
|
||||
"wazuh_active_response",
|
||||
"wazuh_dashboard_secret_patch",
|
||||
"host_write",
|
||||
"firewall_change",
|
||||
"nginx_reload",
|
||||
"k8s_or_argocd_change",
|
||||
"kali_active_scan",
|
||||
]
|
||||
|
||||
FORBIDDEN_TEXT_PATTERNS = [
|
||||
re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b"),
|
||||
re.compile(r"Authorization\s*:", re.IGNORECASE),
|
||||
re.compile(r"Bearer\s+[A-Za-z0-9._-]{10,}", re.IGNORECASE),
|
||||
re.compile(r"Basic\s+[A-Za-z0-9+/=]{10,}", re.IGNORECASE),
|
||||
re.compile(r"password\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
|
||||
re.compile(r"token\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
|
||||
re.compile(r"cookie\s*[:=]\s*['\"][^'\"]+['\"]", re.IGNORECASE),
|
||||
re.compile(r"client\.keys", re.IGNORECASE),
|
||||
re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
|
||||
]
|
||||
|
||||
|
||||
def load_json(path: Path) -> dict[str, Any]:
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def assert_equal(label: str, actual: Any, expected: Any) -> None:
|
||||
if actual != expected:
|
||||
raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}")
|
||||
|
||||
|
||||
def assert_false(label: str, actual: Any) -> None:
|
||||
assert_equal(label, actual, False)
|
||||
|
||||
|
||||
def assert_zero(label: str, actual: Any) -> None:
|
||||
assert_equal(label, actual, 0)
|
||||
|
||||
|
||||
def collect_string_values(value: Any) -> list[str]:
|
||||
if isinstance(value, str):
|
||||
return [value]
|
||||
if isinstance(value, list):
|
||||
values: list[str] = []
|
||||
for item in value:
|
||||
values.extend(collect_string_values(item))
|
||||
return values
|
||||
if isinstance(value, dict):
|
||||
values = []
|
||||
for item in value.values():
|
||||
values.extend(collect_string_values(item))
|
||||
return values
|
||||
return []
|
||||
|
||||
|
||||
def validate_no_forbidden_text(snapshot: dict[str, Any]) -> None:
|
||||
for text in collect_string_values(snapshot):
|
||||
for pattern in FORBIDDEN_TEXT_PATTERNS:
|
||||
if pattern.search(text):
|
||||
raise SystemExit(
|
||||
"BLOCKED wazuh_manager_registry_reviewer_validation: snapshot contains forbidden sensitive text"
|
||||
)
|
||||
|
||||
|
||||
def build_snapshot(generated_at: str) -> dict[str, Any]:
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"generated_at": generated_at,
|
||||
"status": "waiting_owner_registry_export_for_reviewer_validation",
|
||||
"mode": "committed_validation_contract_no_runtime_no_secret_collection",
|
||||
"scope": "wazuh_manager_registry_owner_export_reviewer_validation",
|
||||
"source_refs": [
|
||||
"docs/security/wazuh-agent-visibility-owner-evidence-preflight.snapshot.json",
|
||||
"docs/security/wazuh-managed-host-coverage-gate.snapshot.json",
|
||||
],
|
||||
"summary": {
|
||||
"expected_scope_alias_count": len(EXPECTED_SCOPE_ALIASES),
|
||||
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
|
||||
"per_host_required_field_count": len(PER_HOST_REQUIRED_FIELDS),
|
||||
"reviewer_validation_check_count": len(REVIEWER_VALIDATION_CHECKS),
|
||||
"outcome_lane_count": len(OUTCOME_LANES),
|
||||
"evidence_slot_count": len(EVIDENCE_SLOTS),
|
||||
"forbidden_payload_count": len(FORBIDDEN_PAYLOADS),
|
||||
"forbidden_action_count": len(FORBIDDEN_ACTIONS),
|
||||
"owner_registry_export_received_count": 0,
|
||||
"owner_registry_export_accepted_count": 0,
|
||||
"reviewer_validation_ready_count": 0,
|
||||
"reviewer_validation_passed_count": 0,
|
||||
"reviewer_validation_failed_count": 0,
|
||||
"reviewer_validation_quarantined_count": 0,
|
||||
"manager_registry_accepted_count": 0,
|
||||
"post_enable_readback_passed_count": 0,
|
||||
"runtime_gate_count": 0,
|
||||
"host_write_authorized_count": 0,
|
||||
"active_response_authorized_count": 0,
|
||||
"secret_value_collection_allowed_count": 0,
|
||||
},
|
||||
"expected_scope_aliases": EXPECTED_SCOPE_ALIASES,
|
||||
"required_owner_fields": REQUIRED_OWNER_FIELDS,
|
||||
"per_host_required_fields": PER_HOST_REQUIRED_FIELDS,
|
||||
"reviewer_validation_checks": REVIEWER_VALIDATION_CHECKS,
|
||||
"outcome_lanes": OUTCOME_LANES,
|
||||
"evidence_slots": [
|
||||
{
|
||||
**slot,
|
||||
"received": False,
|
||||
"accepted": False,
|
||||
"quarantined": False,
|
||||
"next_gate": "owner_provided_redacted_export",
|
||||
}
|
||||
for slot in EVIDENCE_SLOTS
|
||||
],
|
||||
"forbidden_payloads": FORBIDDEN_PAYLOADS,
|
||||
"forbidden_actions": FORBIDDEN_ACTIONS,
|
||||
"execution_boundaries": {
|
||||
"wazuh_api_live_query_authorized": False,
|
||||
"wazuh_agent_reenroll_authorized": False,
|
||||
"wazuh_agent_restart_authorized": False,
|
||||
"wazuh_manager_restart_authorized": False,
|
||||
"wazuh_active_response_authorized": False,
|
||||
"host_write_authorized": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
"raw_wazuh_payload_storage_allowed": False,
|
||||
"agent_identity_public_display_allowed": False,
|
||||
"internal_ip_public_display_allowed": False,
|
||||
"kali_active_scan_authorized": False,
|
||||
"runtime_execution_authorized": False,
|
||||
"not_authorization": True,
|
||||
},
|
||||
"no_false_green_rules": [
|
||||
"reviewer validation contract 可見不代表 owner registry export 已收到。",
|
||||
"owner registry export received 不代表 manager_registry_accepted_count 可增加。",
|
||||
"Dashboard 可見、index pattern 三綠勾、HTTP 200 或 transport observed 不可替代 manager registry counts。",
|
||||
"reviewer accepted 只可更新只讀 posture;active response、agent restart、reenroll、host write、secret rotation 或掃描仍需獨立 runtime gate。",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def validate(root: Path) -> None:
|
||||
snapshot = load_json(root / SNAPSHOT_PATH)
|
||||
assert_equal("schema_version", snapshot.get("schema_version"), SCHEMA_VERSION)
|
||||
assert_equal("status", snapshot.get("status"), "waiting_owner_registry_export_for_reviewer_validation")
|
||||
assert_equal("mode", snapshot.get("mode"), "committed_validation_contract_no_runtime_no_secret_collection")
|
||||
assert_equal("scope", snapshot.get("scope"), "wazuh_manager_registry_owner_export_reviewer_validation")
|
||||
assert_equal("expected_scope_aliases", snapshot.get("expected_scope_aliases"), EXPECTED_SCOPE_ALIASES)
|
||||
assert_equal("required_owner_fields", snapshot.get("required_owner_fields"), REQUIRED_OWNER_FIELDS)
|
||||
assert_equal("per_host_required_fields", snapshot.get("per_host_required_fields"), PER_HOST_REQUIRED_FIELDS)
|
||||
assert_equal("reviewer_validation_checks", snapshot.get("reviewer_validation_checks"), REVIEWER_VALIDATION_CHECKS)
|
||||
assert_equal("outcome_lanes", snapshot.get("outcome_lanes"), OUTCOME_LANES)
|
||||
assert_equal("forbidden_payloads", snapshot.get("forbidden_payloads"), FORBIDDEN_PAYLOADS)
|
||||
assert_equal("forbidden_actions", snapshot.get("forbidden_actions"), FORBIDDEN_ACTIONS)
|
||||
|
||||
summary = snapshot.get("summary", {})
|
||||
assert_equal("summary.expected_scope_alias_count", summary.get("expected_scope_alias_count"), len(EXPECTED_SCOPE_ALIASES))
|
||||
assert_equal("summary.required_owner_field_count", summary.get("required_owner_field_count"), len(REQUIRED_OWNER_FIELDS))
|
||||
assert_equal("summary.per_host_required_field_count", summary.get("per_host_required_field_count"), len(PER_HOST_REQUIRED_FIELDS))
|
||||
assert_equal(
|
||||
"summary.reviewer_validation_check_count",
|
||||
summary.get("reviewer_validation_check_count"),
|
||||
len(REVIEWER_VALIDATION_CHECKS),
|
||||
)
|
||||
assert_equal("summary.outcome_lane_count", summary.get("outcome_lane_count"), len(OUTCOME_LANES))
|
||||
assert_equal("summary.evidence_slot_count", summary.get("evidence_slot_count"), len(EVIDENCE_SLOTS))
|
||||
assert_equal("summary.forbidden_payload_count", summary.get("forbidden_payload_count"), len(FORBIDDEN_PAYLOADS))
|
||||
assert_equal("summary.forbidden_action_count", summary.get("forbidden_action_count"), len(FORBIDDEN_ACTIONS))
|
||||
for key in [
|
||||
"owner_registry_export_received_count",
|
||||
"owner_registry_export_accepted_count",
|
||||
"reviewer_validation_ready_count",
|
||||
"reviewer_validation_passed_count",
|
||||
"reviewer_validation_failed_count",
|
||||
"reviewer_validation_quarantined_count",
|
||||
"manager_registry_accepted_count",
|
||||
"post_enable_readback_passed_count",
|
||||
"runtime_gate_count",
|
||||
"host_write_authorized_count",
|
||||
"active_response_authorized_count",
|
||||
"secret_value_collection_allowed_count",
|
||||
]:
|
||||
assert_zero(f"summary.{key}", summary.get(key))
|
||||
|
||||
evidence_slots = snapshot.get("evidence_slots", [])
|
||||
assert_equal("evidence_slots.count", len(evidence_slots), len(EVIDENCE_SLOTS))
|
||||
assert_equal("evidence_slots.ids", [slot.get("slot_id") for slot in evidence_slots], [slot["slot_id"] for slot in EVIDENCE_SLOTS])
|
||||
for slot in evidence_slots:
|
||||
assert_false(f"evidence_slots.{slot.get('slot_id')}.received", slot.get("received"))
|
||||
assert_false(f"evidence_slots.{slot.get('slot_id')}.accepted", slot.get("accepted"))
|
||||
assert_false(f"evidence_slots.{slot.get('slot_id')}.quarantined", slot.get("quarantined"))
|
||||
|
||||
boundaries = snapshot.get("execution_boundaries", {})
|
||||
for key, value in boundaries.items():
|
||||
if key == "not_authorization":
|
||||
assert_equal(f"execution_boundaries.{key}", value, True)
|
||||
else:
|
||||
assert_false(f"execution_boundaries.{key}", value)
|
||||
validate_no_forbidden_text(snapshot)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Wazuh manager registry reviewer validation gate")
|
||||
parser.add_argument("--root", type=Path, default=Path.cwd())
|
||||
parser.add_argument("--output", type=Path)
|
||||
parser.add_argument("--generated-at", default="2026-06-27T15:24:00+08:00")
|
||||
parser.add_argument("--json", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
root = args.root.resolve()
|
||||
if args.output:
|
||||
payload = build_snapshot(args.generated_at)
|
||||
output_path = args.output if args.output.is_absolute() else root / args.output
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||||
print(f"Wrote {output_path}")
|
||||
elif args.json:
|
||||
print(json.dumps(build_snapshot(args.generated_at), ensure_ascii=False, indent=2, sort_keys=True))
|
||||
else:
|
||||
validate(root)
|
||||
snapshot = load_json(root / SNAPSHOT_PATH)
|
||||
summary = snapshot["summary"]
|
||||
print(
|
||||
"WAZUH_MANAGER_REGISTRY_REVIEWER_VALIDATION_OK "
|
||||
f"aliases={summary['expected_scope_alias_count']} "
|
||||
f"checks={summary['reviewer_validation_check_count']} "
|
||||
f"slots={summary['evidence_slot_count']} "
|
||||
f"received={summary['owner_registry_export_received_count']} "
|
||||
f"accepted={summary['owner_registry_export_accepted_count']} "
|
||||
f"runtime_gate={summary['runtime_gate_count']}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user