fix(iwooos): 接上 Wazuh 只讀 API 邊界
This commit is contained in:
@@ -87,6 +87,14 @@ def validate(root: Path) -> None:
|
||||
str(root / "scripts" / "security" / "public-frontend-env-guard.py")
|
||||
)
|
||||
public_frontend_env_guard["validate"](root)
|
||||
wazuh_readonly_route_boundary_guard = runpy.run_path(
|
||||
str(root / "scripts" / "security" / "wazuh-readonly-route-boundary-guard.py")
|
||||
)
|
||||
wazuh_readonly_route_boundary_guard["validate"](root)
|
||||
wazuh_readonly_release_gate = runpy.run_path(
|
||||
str(root / "scripts" / "security" / "wazuh-readonly-release-gate.py")
|
||||
)
|
||||
wazuh_readonly_release_gate["validate"](root)
|
||||
telegram_alert_readability_guard = runpy.run_path(
|
||||
str(root / "scripts" / "security" / "telegram-alert-readability-guard.py")
|
||||
)
|
||||
|
||||
184
scripts/security/wazuh-readonly-production-readback.py
Normal file
184
scripts/security/wazuh-readonly-production-readback.py
Normal file
@@ -0,0 +1,184 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
IwoooS Wazuh 只讀 API production readback 驗收。
|
||||
|
||||
本工具只做 HTTPS GET,不收 secret、不連 Wazuh manager、不做 active
|
||||
response、不碰主機。預設模式用於部署後驗收:production API 不可是 404。
|
||||
若 release 尚未部署,可加 --allow-predeploy-404 記錄目前仍未上線。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
|
||||
DEFAULT_URL = "https://awoooi.wooo.work/api/iwooos/wazuh"
|
||||
EXPECTED_SCHEMA = "iwooos_wazuh_readonly_status_v1"
|
||||
ALLOWED_STATUSES = {
|
||||
"disabled_waiting_iwooos_wazuh_owner_gate",
|
||||
"misconfigured_missing_server_side_wazuh_env",
|
||||
"wazuh_auth_token_missing",
|
||||
"wazuh_readonly_metadata_unavailable",
|
||||
"readonly_metadata_available",
|
||||
}
|
||||
FORBIDDEN_RESPONSE_PATTERNS = [
|
||||
("private_ipv4", re.compile(r"\b(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b")),
|
||||
("known_secret_shape", re.compile(r"Wooo-[0-9]{6,}")),
|
||||
("token_like_field", re.compile(r'"(?:token|password|secret|private_key|runner_token)"\s*:', re.IGNORECASE)),
|
||||
("raw_payload_marker", re.compile(r"raw[_ -]?(?:wazuh|payload|log)", re.IGNORECASE)),
|
||||
("legacy_fake_soc_copy", re.compile(r"IWOOOS SOC Dashboard|Threat Blocked|Recent Automated Responses", re.IGNORECASE)),
|
||||
]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class HttpResult:
|
||||
status_code: int
|
||||
body: str
|
||||
content_type: str
|
||||
|
||||
|
||||
def fetch_url(url: str, timeout_seconds: float) -> HttpResult:
|
||||
request = Request(url, headers={"Accept": "application/json", "User-Agent": "iwooos-wazuh-readback/1.0"})
|
||||
try:
|
||||
with urlopen(request, timeout=timeout_seconds) as response:
|
||||
body = response.read().decode("utf-8", errors="replace")
|
||||
return HttpResult(
|
||||
status_code=response.status,
|
||||
body=body,
|
||||
content_type=response.headers.get("content-type", ""),
|
||||
)
|
||||
except HTTPError as error:
|
||||
body = error.read().decode("utf-8", errors="replace")
|
||||
return HttpResult(
|
||||
status_code=error.code,
|
||||
body=body,
|
||||
content_type=error.headers.get("content-type", ""),
|
||||
)
|
||||
except URLError as error:
|
||||
raise SystemExit(f"BLOCKED production readback network_error={error}") from error
|
||||
|
||||
|
||||
def load_json_body(body: str) -> dict[str, Any]:
|
||||
try:
|
||||
payload = json.loads(body)
|
||||
except json.JSONDecodeError as error:
|
||||
raise SystemExit(f"BLOCKED production readback non_json_response: {error}") from error
|
||||
if not isinstance(payload, dict):
|
||||
raise SystemExit("BLOCKED production readback response_not_object")
|
||||
return payload
|
||||
|
||||
|
||||
def require_false(boundaries: dict[str, Any], key: str) -> None:
|
||||
if boundaries.get(key) is not False:
|
||||
raise SystemExit(f"BLOCKED production readback boundaries.{key}: expected false")
|
||||
|
||||
|
||||
def require_zero(summary: dict[str, Any], key: str) -> None:
|
||||
if summary.get(key) != 0:
|
||||
raise SystemExit(f"BLOCKED production readback summary.{key}: expected 0")
|
||||
|
||||
|
||||
def validate_payload(result: HttpResult, *, allow_predeploy_404: bool) -> dict[str, Any]:
|
||||
if result.status_code == 404 and allow_predeploy_404:
|
||||
payload = load_json_body(result.body)
|
||||
if payload.get("detail") != "Not Found":
|
||||
raise SystemExit("BLOCKED predeploy readback 404 body is not FastAPI Not Found")
|
||||
return {
|
||||
"schema_version": "iwooos_wazuh_production_readback_v1",
|
||||
"status": "predeploy_404_observed",
|
||||
"http_status": result.status_code,
|
||||
"runtime_gate_count": 0,
|
||||
}
|
||||
|
||||
if result.status_code == 404:
|
||||
raise SystemExit("BLOCKED production readback returned 404; Wazuh FastAPI compatibility route is not deployed")
|
||||
if result.status_code not in {200, 502, 503}:
|
||||
raise SystemExit(f"BLOCKED production readback unexpected_http_status={result.status_code}")
|
||||
|
||||
for pattern_id, pattern in FORBIDDEN_RESPONSE_PATTERNS:
|
||||
if pattern.search(result.body):
|
||||
raise SystemExit(f"BLOCKED production readback response leaked forbidden pattern {pattern_id}")
|
||||
|
||||
payload = load_json_body(result.body)
|
||||
if payload.get("schema_version") != EXPECTED_SCHEMA:
|
||||
raise SystemExit(f"BLOCKED production readback schema_version={payload.get('schema_version')!r}")
|
||||
if payload.get("status") not in ALLOWED_STATUSES:
|
||||
raise SystemExit(f"BLOCKED production readback status={payload.get('status')!r}")
|
||||
if payload.get("mode") != "metadata_only_no_active_response_no_raw_payload":
|
||||
raise SystemExit(f"BLOCKED production readback mode={payload.get('mode')!r}")
|
||||
|
||||
summary = payload.get("summary")
|
||||
if not isinstance(summary, dict):
|
||||
raise SystemExit("BLOCKED production readback summary missing")
|
||||
boundaries = payload.get("boundaries")
|
||||
if not isinstance(boundaries, dict):
|
||||
raise SystemExit("BLOCKED production readback boundaries missing")
|
||||
|
||||
for key in [
|
||||
"wazuh_manager_query_accepted_count",
|
||||
"wazuh_event_accepted_count",
|
||||
"host_forensics_accepted_count",
|
||||
"active_response_authorized_count",
|
||||
"host_write_authorized_count",
|
||||
"runtime_gate_count",
|
||||
]:
|
||||
require_zero(summary, key)
|
||||
|
||||
for key in [
|
||||
"active_response_authorized",
|
||||
"host_write_authorized",
|
||||
"secret_value_collection_allowed",
|
||||
"raw_wazuh_payload_storage_allowed",
|
||||
"agent_identity_public_display_allowed",
|
||||
"internal_ip_public_display_allowed",
|
||||
]:
|
||||
require_false(boundaries, key)
|
||||
|
||||
if boundaries.get("not_authorization") is not True:
|
||||
raise SystemExit("BLOCKED production readback boundaries.not_authorization: expected true")
|
||||
|
||||
return {
|
||||
"schema_version": "iwooos_wazuh_production_readback_v1",
|
||||
"status": "production_readback_passed",
|
||||
"http_status": result.status_code,
|
||||
"api_status": payload["status"],
|
||||
"configured": bool(payload.get("configured")),
|
||||
"runtime_gate_count": summary["runtime_gate_count"],
|
||||
}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="IwoooS Wazuh 只讀 API production readback 驗收")
|
||||
parser.add_argument("--url", default=DEFAULT_URL, help="production Wazuh read-only API URL")
|
||||
parser.add_argument("--timeout-seconds", type=float, default=10.0)
|
||||
parser.add_argument(
|
||||
"--allow-predeploy-404",
|
||||
action="store_true",
|
||||
help="僅供尚未部署時記錄 404 現況;正式部署驗收不得使用",
|
||||
)
|
||||
parser.add_argument("--json", action="store_true", help="輸出 JSON 摘要")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = fetch_url(args.url, args.timeout_seconds)
|
||||
report = validate_payload(result, allow_predeploy_404=args.allow_predeploy_404)
|
||||
if args.json:
|
||||
print(json.dumps(report, ensure_ascii=False, sort_keys=True))
|
||||
else:
|
||||
print(
|
||||
"WAZUH_READONLY_PRODUCTION_READBACK_OK "
|
||||
f"status={report['status']} "
|
||||
f"http={report['http_status']} "
|
||||
f"runtime_gate={report['runtime_gate_count']}"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
186
scripts/security/wazuh-readonly-release-gate.py
Normal file
186
scripts/security/wazuh-readonly-release-gate.py
Normal file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
IwoooS Wazuh 只讀 API release gate。
|
||||
|
||||
本工具只檢查 repo 內 source、snapshot 與 gate 狀態,不連 production、
|
||||
不查 Wazuh、不讀 secret、不做 deploy。目的在於固定「source-side 已完成」
|
||||
與「Gitea push / production deploy / production readback 尚未完成」的界線。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
TAIPEI = timezone(timedelta(hours=8))
|
||||
SNAPSHOT_PATH = Path("docs/security/wazuh-readonly-release-gate.snapshot.json")
|
||||
REQUIRED_SOURCE_PATHS = [
|
||||
"apps/api/src/api/v1/iwooos.py",
|
||||
"apps/api/tests/test_iwooos_wazuh_api.py",
|
||||
"apps/web/src/app/api/iwooos/wazuh/route.ts",
|
||||
"docs/security/IWOOOS-WAZUH-READONLY-API-RELEASE-HANDOFF.md",
|
||||
"scripts/security/wazuh-readonly-production-readback.py",
|
||||
"scripts/security/wazuh-readonly-route-boundary-guard.py",
|
||||
]
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return datetime.now(TAIPEI).replace(microsecond=0).isoformat()
|
||||
|
||||
|
||||
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
|
||||
missing_paths = [path for path in REQUIRED_SOURCE_PATHS if not (root / path).exists()]
|
||||
source_ready = not missing_paths
|
||||
return {
|
||||
"schema_version": "iwooos_wazuh_readonly_release_gate_v1",
|
||||
"generated_at": generated_at or now_iso(),
|
||||
"status": "blocked_waiting_gitea_push_and_production_deploy",
|
||||
"mode": "repo_release_gate_no_runtime_no_secret_collection",
|
||||
"required_source_paths": REQUIRED_SOURCE_PATHS,
|
||||
"summary": {
|
||||
"source_side_fix_complete_count": 1 if source_ready else 0,
|
||||
"route_boundary_guard_complete_count": 1 if (root / "scripts/security/wazuh-readonly-route-boundary-guard.py").exists() else 0,
|
||||
"production_readback_script_complete_count": 1 if (root / "scripts/security/wazuh-readonly-production-readback.py").exists() else 0,
|
||||
"release_handoff_complete_count": 1 if (root / "docs/security/IWOOOS-WAZUH-READONLY-API-RELEASE-HANDOFF.md").exists() else 0,
|
||||
"missing_required_source_path_count": len(missing_paths),
|
||||
"gitea_push_complete_count": 0,
|
||||
"production_deploy_complete_count": 0,
|
||||
"production_readback_passed_count": 0,
|
||||
"predeploy_404_observed_count": 1,
|
||||
"wazuh_server_side_env_enabled_count": 0,
|
||||
"wazuh_event_ref_accepted_count": 0,
|
||||
"host_forensics_ref_accepted_count": 0,
|
||||
"active_response_authorized_count": 0,
|
||||
"host_write_authorized_count": 0,
|
||||
"runtime_gate_count": 0,
|
||||
},
|
||||
"release_gates": [
|
||||
{
|
||||
"gate_id": "source_side_fastapi_route",
|
||||
"status": "passed",
|
||||
"required_evidence": "FastAPI /api/iwooos/wazuh 與 /api/v1/iwooos/wazuh source path present",
|
||||
"runtime_authorized": False,
|
||||
},
|
||||
{
|
||||
"gate_id": "source_boundary_guard",
|
||||
"status": "passed",
|
||||
"required_evidence": "wazuh-readonly-route-boundary-guard.py 通過",
|
||||
"runtime_authorized": False,
|
||||
},
|
||||
{
|
||||
"gate_id": "production_readback_script",
|
||||
"status": "passed",
|
||||
"required_evidence": "wazuh-readonly-production-readback.py 可在 release 後不接受 404",
|
||||
"runtime_authorized": False,
|
||||
},
|
||||
{
|
||||
"gate_id": "gitea_branch_push",
|
||||
"status": "blocked_credential_required",
|
||||
"required_evidence": "具備正式權限的 lane 推送或合併 codex/iwooos-wazuh-boundary-guard-20260624",
|
||||
"runtime_authorized": False,
|
||||
},
|
||||
{
|
||||
"gate_id": "production_deploy",
|
||||
"status": "blocked_waiting_release_lane",
|
||||
"required_evidence": "Gitea CD / deploy marker 指向已合併 Wazuh fix 的 commit",
|
||||
"runtime_authorized": False,
|
||||
},
|
||||
{
|
||||
"gate_id": "production_readback",
|
||||
"status": "blocked_waiting_deploy",
|
||||
"required_evidence": "python3 scripts/security/wazuh-readonly-production-readback.py --json 通過且不回 404",
|
||||
"runtime_authorized": False,
|
||||
},
|
||||
{
|
||||
"gate_id": "wazuh_live_metadata_env",
|
||||
"status": "blocked_owner_gate_required",
|
||||
"required_evidence": "server-side env 與 owner gate;不得硬編 secret",
|
||||
"runtime_authorized": False,
|
||||
},
|
||||
],
|
||||
"execution_boundaries": {
|
||||
"runtime_execution_authorized": False,
|
||||
"production_deploy_authorized": False,
|
||||
"wazuh_api_live_query_authorized": False,
|
||||
"wazuh_active_response_authorized": False,
|
||||
"host_read_authorized": False,
|
||||
"host_write_authorized": False,
|
||||
"kali_active_scan_authorized": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
"raw_wazuh_payload_storage_allowed": False,
|
||||
"internal_ip_public_display_allowed": False,
|
||||
"agent_identity_public_display_allowed": False,
|
||||
"force_push_allowed": False,
|
||||
"not_authorization": True,
|
||||
},
|
||||
"missing_required_source_paths": missing_paths,
|
||||
"operator_interpretation": [
|
||||
"此 gate 通過不代表 production 已部署,只代表 source-side Wazuh read-only API 與 guard 可交接。",
|
||||
"正式 release 前不得用 predeploy 404 當成功,也不得為了修 404 直接改 Nginx、Docker、K8s、firewall 或 Wazuh secret。",
|
||||
"live Wazuh metadata query 必須另走 owner gate 與 server-side env;active response、host write、Kali active scan 仍為 0 / false。",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def load_json(path: Path) -> dict[str, Any]:
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def validate(root: Path) -> None:
|
||||
report = build_report(root)
|
||||
snapshot_path = root / SNAPSHOT_PATH
|
||||
if not snapshot_path.exists():
|
||||
raise SystemExit(f"BLOCKED Wazuh release gate snapshot missing: {SNAPSHOT_PATH.as_posix()}")
|
||||
snapshot = load_json(snapshot_path)
|
||||
|
||||
expected_summary = report["summary"]
|
||||
for key, expected in expected_summary.items():
|
||||
actual = snapshot.get("summary", {}).get(key)
|
||||
if actual != expected:
|
||||
raise SystemExit(f"BLOCKED Wazuh release gate summary.{key}: expected {expected!r}, got {actual!r}")
|
||||
|
||||
if snapshot.get("schema_version") != "iwooos_wazuh_readonly_release_gate_v1":
|
||||
raise SystemExit("BLOCKED Wazuh release gate schema_version mismatch")
|
||||
if snapshot.get("status") != "blocked_waiting_gitea_push_and_production_deploy":
|
||||
raise SystemExit("BLOCKED Wazuh release gate status mismatch")
|
||||
for key, value in snapshot.get("execution_boundaries", {}).items():
|
||||
if key == "not_authorization":
|
||||
if value is not True:
|
||||
raise SystemExit("BLOCKED Wazuh release gate not_authorization must be true")
|
||||
elif value is not False:
|
||||
raise SystemExit(f"BLOCKED Wazuh release gate execution_boundaries.{key}: expected false")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="IwoooS Wazuh 只讀 API release gate")
|
||||
parser.add_argument("--root", default=".", help="repository root")
|
||||
parser.add_argument("--output", help="寫出 JSON 報告")
|
||||
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
|
||||
args = parser.parse_args()
|
||||
|
||||
root = Path(args.root).resolve()
|
||||
report = build_report(root, args.generated_at)
|
||||
if args.output:
|
||||
output = Path(args.output)
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
output.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||||
validate(root)
|
||||
summary = report["summary"]
|
||||
print(
|
||||
"WAZUH_READONLY_RELEASE_GATE_OK "
|
||||
f"source={summary['source_side_fix_complete_count']} "
|
||||
f"push={summary['gitea_push_complete_count']} "
|
||||
f"deploy={summary['production_deploy_complete_count']} "
|
||||
f"readback={summary['production_readback_passed_count']} "
|
||||
f"runtime_gate={summary['runtime_gate_count']}"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
326
scripts/security/wazuh-readonly-route-boundary-guard.py
Normal file
326
scripts/security/wazuh-readonly-route-boundary-guard.py
Normal file
@@ -0,0 +1,326 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
檢查 IwoooS Wazuh 只讀 API 接線邊界。
|
||||
|
||||
本 guard 只掃描 repo 內 source,不連線 Wazuh、不讀主機、不收 secret、
|
||||
不啟用 active response,也不做任何 production runtime 動作。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
TAIPEI = timezone(timedelta(hours=8))
|
||||
NEXT_ROUTE_PATH = Path("apps/web/src/app/api/iwooos/wazuh/route.ts")
|
||||
BACKEND_ROUTE_PATH = Path("apps/api/src/api/v1/iwooos.py")
|
||||
PUBLIC_PAGE_PATH = Path("apps/web/src/app/[locale]/iwooos/page.tsx")
|
||||
PUBLIC_COMPONENT_ROOT = Path("apps/web/src/components/iwooos")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ForbiddenPattern:
|
||||
pattern_id: str
|
||||
pattern: re.Pattern[str]
|
||||
scope: str
|
||||
|
||||
|
||||
ROUTE_REQUIRED_TOKENS = [
|
||||
"IWOOOS_WAZUH_READONLY_ENABLED",
|
||||
"WAZUH_API_BASE_URL",
|
||||
"WAZUH_API_USERNAME",
|
||||
"WAZUH_API_PASSWORD",
|
||||
"process.env",
|
||||
"requireHttpsBaseUrl",
|
||||
"metadata_only_no_active_response_no_raw_payload",
|
||||
"active_response_authorized: false",
|
||||
"host_write_authorized: false",
|
||||
"runtime_gate_count: 0",
|
||||
"secret_value_collection_allowed: false",
|
||||
"raw_wazuh_payload_storage_allowed: false",
|
||||
"agent_identity_public_display_allowed: false",
|
||||
"internal_ip_public_display_allowed: false",
|
||||
"not_authorization: true",
|
||||
"redactedAgent",
|
||||
"alias: `agent-",
|
||||
]
|
||||
|
||||
BACKEND_REQUIRED_TOKENS = [
|
||||
"/api/iwooos/wazuh",
|
||||
"/api/v1/iwooos/wazuh",
|
||||
"IWOOOS_WAZUH_READONLY_ENABLED",
|
||||
"WAZUH_API_BASE_URL",
|
||||
"WAZUH_API_USERNAME",
|
||||
"WAZUH_API_PASSWORD",
|
||||
"metadata_only_no_active_response_no_raw_payload",
|
||||
"active_response_authorized_count",
|
||||
"host_write_authorized_count",
|
||||
"runtime_gate_count",
|
||||
"raw_wazuh_payload_storage_allowed",
|
||||
"internal_ip_public_display_allowed",
|
||||
"_redacted_agent",
|
||||
]
|
||||
|
||||
|
||||
FORBIDDEN_PATTERNS = [
|
||||
ForbiddenPattern(
|
||||
"hardcoded_wazuh_private_url",
|
||||
re.compile(r"https?://(?:10|127|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.[^\s'\"`]+", re.IGNORECASE),
|
||||
"route_and_public_ui",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"wazuh_default_api_port_literal",
|
||||
re.compile(r"(?<![A-Za-z0-9_])55000(?![A-Za-z0-9_])"),
|
||||
"route_and_public_ui",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"node_tls_reject_unauthorized_disabled",
|
||||
re.compile(r"NODE_TLS_REJECT_UNAUTHORIZED|rejectUnauthorized\s*:\s*false", re.IGNORECASE),
|
||||
"route_and_public_ui",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"hardcoded_wazuh_password_assignment",
|
||||
re.compile(r"WAZUH_API_PASSWORD\s*=\s*['\"]|password\s*:\s*['\"]|passwd\s*:\s*['\"]", re.IGNORECASE),
|
||||
"route",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"hardcoded_wazuh_username_assignment",
|
||||
re.compile(r"WAZUH_API_USERNAME\s*=\s*['\"]|username\s*:\s*['\"]|user\s*:\s*['\"]", re.IGNORECASE),
|
||||
"route",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"known_secret_shape",
|
||||
re.compile(r"Wooo-[0-9]{6,}"),
|
||||
"route_and_public_ui",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"fake_soc_dashboard_copy",
|
||||
re.compile(
|
||||
r"IWOOOS SOC Dashboard|Threat Blocked|Wazuh Agents Status|Protected Nodes|Recent Automated Responses",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
"public_ui",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"fake_cve_or_alert_fixture",
|
||||
re.compile(r"CVE-2025-55182|recentAlerts\s*:\s*\[|vulnerabilities\s*:\s*\[", re.IGNORECASE),
|
||||
"public_ui",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"legacy_wazuh_dashboard_component",
|
||||
re.compile(r"FifaWazuhMonitor"),
|
||||
"public_ui",
|
||||
),
|
||||
ForbiddenPattern(
|
||||
"raw_wazuh_payload_public_copy",
|
||||
re.compile(r"rawWazuhPayload|raw_wazuh_payload\s*[:=]\s*[\[{]|raw Wazuh payload\s*[:=]", re.IGNORECASE),
|
||||
"public_ui",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return datetime.now(TAIPEI).replace(microsecond=0).isoformat()
|
||||
|
||||
|
||||
def read_text(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def collect_public_ui_files(root: Path) -> list[Path]:
|
||||
files: list[Path] = []
|
||||
page = root / PUBLIC_PAGE_PATH
|
||||
if page.exists():
|
||||
files.append(PUBLIC_PAGE_PATH)
|
||||
component_root = root / PUBLIC_COMPONENT_ROOT
|
||||
if component_root.exists():
|
||||
files.extend(
|
||||
path.relative_to(root)
|
||||
for path in sorted(component_root.rglob("*"))
|
||||
if path.is_file() and path.suffix in {".ts", ".tsx", ".js", ".jsx"}
|
||||
)
|
||||
return files
|
||||
|
||||
|
||||
def source_lines(text: str) -> list[tuple[int, str]]:
|
||||
return list(enumerate(text.splitlines(), start=1))
|
||||
|
||||
|
||||
def pattern_applies(pattern: ForbiddenPattern, source_kind: str) -> bool:
|
||||
if pattern.scope == "route_and_public_ui":
|
||||
return True
|
||||
if pattern.scope == "route":
|
||||
return source_kind == "route"
|
||||
if pattern.scope == "public_ui":
|
||||
return source_kind == "public_ui"
|
||||
return False
|
||||
|
||||
|
||||
def collect_forbidden_matches(root: Path) -> list[dict[str, Any]]:
|
||||
targets: list[tuple[str, Path]] = [("route", NEXT_ROUTE_PATH), ("route", BACKEND_ROUTE_PATH)]
|
||||
targets.extend(("public_ui", path) for path in collect_public_ui_files(root))
|
||||
|
||||
matches: list[dict[str, Any]] = []
|
||||
for source_kind, relative_path in targets:
|
||||
path = root / relative_path
|
||||
if not path.exists():
|
||||
continue
|
||||
for line_number, line in source_lines(read_text(path)):
|
||||
for forbidden in FORBIDDEN_PATTERNS:
|
||||
if pattern_applies(forbidden, source_kind) and forbidden.pattern.search(line):
|
||||
matches.append(
|
||||
{
|
||||
"path": relative_path.as_posix(),
|
||||
"line": line_number,
|
||||
"pattern_id": forbidden.pattern_id,
|
||||
"source_kind": source_kind,
|
||||
}
|
||||
)
|
||||
return matches
|
||||
|
||||
|
||||
def collect_missing_required_tokens(route_text: str, required_tokens: list[str]) -> list[str]:
|
||||
return [token for token in required_tokens if token not in route_text]
|
||||
|
||||
|
||||
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
|
||||
next_route = root / NEXT_ROUTE_PATH
|
||||
backend_route = root / BACKEND_ROUTE_PATH
|
||||
next_route_present = next_route.exists()
|
||||
backend_route_present = backend_route.exists()
|
||||
next_route_text = read_text(next_route) if next_route_present else ""
|
||||
backend_route_text = read_text(backend_route) if backend_route_present else ""
|
||||
public_ui_files = collect_public_ui_files(root)
|
||||
missing_required_tokens = {
|
||||
NEXT_ROUTE_PATH.as_posix(): collect_missing_required_tokens(next_route_text, ROUTE_REQUIRED_TOKENS),
|
||||
BACKEND_ROUTE_PATH.as_posix(): collect_missing_required_tokens(backend_route_text, BACKEND_REQUIRED_TOKENS),
|
||||
}
|
||||
missing_required_token_count = sum(len(tokens) for tokens in missing_required_tokens.values())
|
||||
forbidden_matches = collect_forbidden_matches(root)
|
||||
|
||||
return {
|
||||
"schema_version": "wazuh_readonly_route_boundary_guard_v1",
|
||||
"generated_at": generated_at or now_iso(),
|
||||
"status": (
|
||||
"pass"
|
||||
if next_route_present and backend_route_present and not missing_required_token_count and not forbidden_matches
|
||||
else "blocked"
|
||||
),
|
||||
"mode": "repo_source_scan_no_runtime_no_secret_collection",
|
||||
"guarded_route_paths": [NEXT_ROUTE_PATH.as_posix(), BACKEND_ROUTE_PATH.as_posix()],
|
||||
"guarded_public_ui_paths": [path.as_posix() for path in public_ui_files],
|
||||
"required_route_tokens": {
|
||||
NEXT_ROUTE_PATH.as_posix(): ROUTE_REQUIRED_TOKENS,
|
||||
BACKEND_ROUTE_PATH.as_posix(): BACKEND_REQUIRED_TOKENS,
|
||||
},
|
||||
"forbidden_pattern_ids": [pattern.pattern_id for pattern in FORBIDDEN_PATTERNS],
|
||||
"summary": {
|
||||
"route_present_count": int(next_route_present) + int(backend_route_present),
|
||||
"next_route_present_count": 1 if next_route_present else 0,
|
||||
"backend_route_present_count": 1 if backend_route_present else 0,
|
||||
"public_ui_file_count": len(public_ui_files),
|
||||
"required_token_count": len(ROUTE_REQUIRED_TOKENS) + len(BACKEND_REQUIRED_TOKENS),
|
||||
"missing_required_token_count": missing_required_token_count,
|
||||
"forbidden_pattern_count": len(FORBIDDEN_PATTERNS),
|
||||
"forbidden_match_count": len(forbidden_matches),
|
||||
"readonly_api_default_closed_count": sum(
|
||||
"IWOOOS_WAZUH_READONLY_ENABLED" in text for text in [next_route_text, backend_route_text]
|
||||
),
|
||||
"server_side_env_required_count": sum(
|
||||
token in text
|
||||
for text in [next_route_text, backend_route_text]
|
||||
for token in ["WAZUH_API_BASE_URL", "WAZUH_API_USERNAME", "WAZUH_API_PASSWORD"]
|
||||
),
|
||||
"tls_disable_match_count": sum(
|
||||
1
|
||||
for item in forbidden_matches
|
||||
if item["pattern_id"] == "node_tls_reject_unauthorized_disabled"
|
||||
),
|
||||
"hardcoded_private_url_match_count": sum(
|
||||
1 for item in forbidden_matches if item["pattern_id"] == "hardcoded_wazuh_private_url"
|
||||
),
|
||||
"fake_soc_fixture_match_count": sum(
|
||||
1
|
||||
for item in forbidden_matches
|
||||
if item["pattern_id"] in {"fake_soc_dashboard_copy", "fake_cve_or_alert_fixture"}
|
||||
),
|
||||
"active_response_authorized_count": 0,
|
||||
"host_write_authorized_count": 0,
|
||||
"runtime_gate_count": 0,
|
||||
"action_button_count": 0,
|
||||
},
|
||||
"execution_boundaries": {
|
||||
"runtime_execution_authorized": False,
|
||||
"wazuh_api_live_query_authorized": False,
|
||||
"wazuh_active_response_authorized": False,
|
||||
"host_read_authorized": False,
|
||||
"host_write_authorized": False,
|
||||
"secret_value_collection_allowed": False,
|
||||
"raw_wazuh_payload_storage_allowed": False,
|
||||
"agent_identity_public_display_allowed": False,
|
||||
"internal_ip_public_display_allowed": False,
|
||||
"frontend_public_raw_alert_display_allowed": False,
|
||||
"action_buttons_allowed": False,
|
||||
"not_authorization": True,
|
||||
},
|
||||
"missing_required_tokens": missing_required_tokens,
|
||||
"forbidden_matches": forbidden_matches,
|
||||
"operator_interpretation": [
|
||||
"Wazuh API code path 必須預設關閉,只有 server-side env 與 owner gate 允許只讀 metadata 查詢。",
|
||||
"不得硬編 Wazuh 內網 URL、使用者、密碼或關閉 TLS 驗證。",
|
||||
"前台不得顯示假 SOC dashboard、假 CVE、假 automated response、raw payload、agent 原名或內網 IP。",
|
||||
"此 guard 通過只代表 source 邊界合格,不代表 Wazuh live query、active response、host containment 或 runtime remediation 已授權。",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def validate(root: Path) -> None:
|
||||
report = build_report(root)
|
||||
errors: list[str] = []
|
||||
if report["summary"]["next_route_present_count"] != 1:
|
||||
errors.append(f"{NEXT_ROUTE_PATH.as_posix()}: Wazuh Next.js 只讀 route 不存在")
|
||||
if report["summary"]["backend_route_present_count"] != 1:
|
||||
errors.append(f"{BACKEND_ROUTE_PATH.as_posix()}: Wazuh FastAPI 相容 route 不存在")
|
||||
for path, tokens in report["missing_required_tokens"].items():
|
||||
for token in tokens:
|
||||
errors.append(f"{path}: 缺少必要只讀邊界 token {token!r}")
|
||||
for item in report["forbidden_matches"]:
|
||||
errors.append(
|
||||
f"{item['path']}:{item['line']}: 命中 Wazuh 邊界 forbidden pattern {item['pattern_id']}"
|
||||
)
|
||||
if errors:
|
||||
raise SystemExit("BLOCKED Wazuh readonly route boundary guard:\n" + "\n".join(f"- {error}" for error in errors))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="檢查 IwoooS Wazuh 只讀 API 接線邊界")
|
||||
parser.add_argument("--root", default=".", help="repository root")
|
||||
parser.add_argument("--output", help="寫出 JSON 報告")
|
||||
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
|
||||
args = parser.parse_args()
|
||||
|
||||
root = Path(args.root).resolve()
|
||||
report = build_report(root, args.generated_at)
|
||||
if args.output:
|
||||
output = Path(args.output)
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
output.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||||
validate(root)
|
||||
summary = report["summary"]
|
||||
print(
|
||||
"WAZUH_READONLY_ROUTE_BOUNDARY_GUARD_OK "
|
||||
f"route={summary['route_present_count']} "
|
||||
f"public_ui_files={summary['public_ui_file_count']} "
|
||||
f"forbidden={summary['forbidden_match_count']} "
|
||||
f"runtime_gate={summary['runtime_gate_count']}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user