#!/usr/bin/env python3 """ IwoooS Wazuh 只讀 API production readback 驗收。 本工具只做 HTTPS GET,不收 secret、不連 Wazuh manager、不做 active response、不碰主機。預設模式用於部署後驗收:production API 不可是 404。 若 release 尚未部署,可加 --allow-predeploy-404 記錄目前仍未上線。 """ from __future__ import annotations import argparse import json import re import sys from dataclasses import dataclass from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen DEFAULT_URL = "https://awoooi.wooo.work/api/iwooos/wazuh" EXPECTED_SCHEMA = "iwooos_wazuh_readonly_status_v1" ALLOWED_STATUSES = { "disabled_waiting_iwooos_wazuh_owner_gate", "misconfigured_missing_server_side_wazuh_env", "wazuh_auth_token_missing", "wazuh_readonly_metadata_unavailable", "wazuh_agent_registry_empty", "wazuh_agent_registry_below_expected", "readonly_metadata_available", } FORBIDDEN_RESPONSE_PATTERNS = [ ("private_ipv4", re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b")), ("known_secret_shape", re.compile(r"Wooo-[0-9]{6,}")), ("token_like_field", re.compile(r'"(?:token|password|secret|private_key|runner_token)"\s*:', re.IGNORECASE)), ( "raw_payload_marker", re.compile(r"raw[_ -]?(?:log|event|alert|body|request|response)", re.IGNORECASE), ), ("legacy_fake_soc_copy", re.compile(r"IWOOOS SOC Dashboard|Threat Blocked|Recent Automated Responses", re.IGNORECASE)), ] @dataclass(frozen=True) class HttpResult: status_code: int body: str content_type: str def fetch_url(url: str, timeout_seconds: float) -> HttpResult: request = Request(url, headers={"Accept": "application/json", "User-Agent": "iwooos-wazuh-readback/1.0"}) try: with urlopen(request, timeout=timeout_seconds) as response: body = response.read().decode("utf-8", errors="replace") return HttpResult( status_code=response.status, body=body, content_type=response.headers.get("content-type", ""), ) except HTTPError as error: body = error.read().decode("utf-8", errors="replace") return HttpResult( status_code=error.code, body=body, content_type=error.headers.get("content-type", ""), ) except URLError as error: raise SystemExit(f"BLOCKED production readback network_error={error}") from error def load_json_body(body: str) -> dict[str, Any]: try: payload = json.loads(body) except json.JSONDecodeError as error: raise SystemExit(f"BLOCKED production readback non_json_response: {error}") from error if not isinstance(payload, dict): raise SystemExit("BLOCKED production readback response_not_object") return payload def require_false(boundaries: dict[str, Any], key: str) -> None: if boundaries.get(key) is not False: raise SystemExit(f"BLOCKED production readback boundaries.{key}: expected false") def require_zero(summary: dict[str, Any], key: str) -> None: if summary.get(key) != 0: raise SystemExit(f"BLOCKED production readback summary.{key}: expected 0") def validate_payload(result: HttpResult, *, allow_predeploy_404: bool) -> dict[str, Any]: if result.status_code == 404 and allow_predeploy_404: payload = load_json_body(result.body) if payload.get("detail") != "Not Found": raise SystemExit("BLOCKED predeploy readback 404 body is not FastAPI Not Found") return { "schema_version": "iwooos_wazuh_production_readback_v1", "status": "predeploy_404_observed", "http_status": result.status_code, "runtime_gate_count": 0, } if result.status_code == 404: raise SystemExit("BLOCKED production readback returned 404; Wazuh FastAPI compatibility route is not deployed") if result.status_code not in {200, 502, 503}: raise SystemExit(f"BLOCKED production readback unexpected_http_status={result.status_code}") for pattern_id, pattern in FORBIDDEN_RESPONSE_PATTERNS: if pattern.search(result.body): raise SystemExit(f"BLOCKED production readback response leaked forbidden pattern {pattern_id}") payload = load_json_body(result.body) if payload.get("schema_version") != EXPECTED_SCHEMA: raise SystemExit(f"BLOCKED production readback schema_version={payload.get('schema_version')!r}") if payload.get("status") not in ALLOWED_STATUSES: raise SystemExit(f"BLOCKED production readback status={payload.get('status')!r}") if payload.get("mode") != "metadata_only_no_active_response_no_raw_payload": raise SystemExit(f"BLOCKED production readback mode={payload.get('mode')!r}") summary = payload.get("summary") if not isinstance(summary, dict): raise SystemExit("BLOCKED production readback summary missing") boundaries = payload.get("boundaries") if not isinstance(boundaries, dict): raise SystemExit("BLOCKED production readback boundaries missing") for key in [ "wazuh_manager_query_accepted_count", "wazuh_event_accepted_count", "host_forensics_accepted_count", "active_response_authorized_count", "host_write_authorized_count", "runtime_gate_count", ]: require_zero(summary, key) for key in [ "active_response_authorized", "host_write_authorized", "secret_value_collection_allowed", "raw_wazuh_payload_storage_allowed", "agent_identity_public_display_allowed", "internal_ip_public_display_allowed", ]: require_false(boundaries, key) if boundaries.get("not_authorization") is not True: raise SystemExit("BLOCKED production readback boundaries.not_authorization: expected true") return { "schema_version": "iwooos_wazuh_production_readback_v1", "status": "production_readback_passed", "http_status": result.status_code, "api_status": payload["status"], "configured": bool(payload.get("configured")), "runtime_gate_count": summary["runtime_gate_count"], } def main() -> int: parser = argparse.ArgumentParser(description="IwoooS Wazuh 只讀 API production readback 驗收") parser.add_argument("--url", default=DEFAULT_URL, help="production Wazuh read-only API URL") parser.add_argument("--timeout-seconds", type=float, default=10.0) parser.add_argument( "--allow-predeploy-404", action="store_true", help="僅供尚未部署時記錄 404 現況;正式部署驗收不得使用", ) parser.add_argument("--json", action="store_true", help="輸出 JSON 摘要") args = parser.parse_args() result = fetch_url(args.url, args.timeout_seconds) report = validate_payload(result, allow_predeploy_404=args.allow_predeploy_404) if args.json: print(json.dumps(report, ensure_ascii=False, sort_keys=True)) else: print( "WAZUH_READONLY_PRODUCTION_READBACK_OK " f"status={report['status']} " f"http={report['http_status']} " f"runtime_gate={report['runtime_gate_count']}" ) return 0 if __name__ == "__main__": sys.exit(main())