Files
awoooi/scripts/security/soc-siem-kali-wazuh-integration-control.py
Your Name a1bce80842
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
Code Review / ai-code-review (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
feat(iwooos): 整合 SOC SIEM Kali Wazuh 控制
2026-06-18 12:04:06 +08:00

505 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS SOC / SIEM / Kali 112 / Wazuh 整合控制矩陣產生器。
本工具只產生 repo 內 snapshot將 Wazuh、Kali 112、Prometheus /
Alertmanager、SigNoz、Sentry、host / gateway / K8s / supply-chain 訊號,
收斂成一份可審核的 SOC 整合控制面。
它不連線 Wazuh、不呼叫 Kali、不 SSH、不讀 live log、不送 Telegram、
不 reload Alertmanager / Prometheus、不啟用 active response、不執行 scan、
不建立 case、不修改 firewall / Nginx / K8s / workflow / secret。
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
STANDARD_FRAMEWORKS = [
{
"framework_id": "nist_csf_2_0",
"label": "NIST CSF 2.0",
"mapped_functions": ["Govern", "Identify", "Protect", "Detect", "Respond", "Recover"],
"integration_intent": "將資安監控與回應放進治理、辨識、防護、偵測、回應與復原閉環。",
},
{
"framework_id": "cis_controls_v8_1",
"label": "CIS Controls v8.1",
"mapped_functions": ["Inventory", "Vulnerability", "Audit Log", "Malware", "Recovery", "Access"],
"integration_intent": "把資產、弱點、稽核日誌、惡意程式防護、復原與權限審查納入 IwoooS。",
},
{
"framework_id": "cisa_kev_prioritization",
"label": "CISA KEV 優先化",
"mapped_functions": ["Known exploited vulnerability", "Patch priority", "Owner SLA"],
"integration_intent": "以已知遭利用漏洞作為漏洞修補與維護窗口排序依據。",
},
{
"framework_id": "owasp_asvs_logging",
"label": "OWASP ASVS / Logging",
"mapped_functions": ["Auth log", "Access-control log", "No secret in log", "Verification"],
"integration_intent": "把應用層安全事件、拒絕存取、驗證失敗與敏感資料不落 log 納入前後台驗證。",
},
{
"framework_id": "wazuh_xdr_siem",
"label": "Wazuh XDR / SIEM",
"mapped_functions": ["Agent telemetry", "FIM", "Rule", "Decoder", "Alert", "Active response dry-run"],
"integration_intent": "將 endpoint / host 訊號、檔案完整性、事件規則與 response 邊界納入 IwoooS。",
},
{
"framework_id": "suricata_ndr_ids",
"label": "Suricata NDR / IDS",
"mapped_functions": ["Network detection", "Passive telemetry", "Rule hit", "Future IPS gate"],
"integration_intent": "將網路偵測與封包層線索納入未來 NDR laneIPS 仍需獨立批准。",
},
{
"framework_id": "kali_assessment_tooling",
"label": "Kali assessment tooling",
"mapped_functions": ["Health", "Scope", "Safe crawl", "Tool version", "Finding normalization"],
"integration_intent": "Kali 112 作為安全驗證與工具節點,先接只讀 health / scope / finding contract。",
},
]
CONTROL_DOMAINS = [
("asset_inventory_owner", "資產 / owner / attack surface inventory", "C0"),
("endpoint_log_collection", "Endpoint / host / auth / process log collection", "C0"),
("fim_persistence_detection", "FIM / persistence / malware signal", "C0"),
("vulnerability_kev_prioritization", "CVE / KEV / package / image prioritization", "C0"),
("network_detection_response", "NDR / IDS / firewall / WireGuard / NodePort evidence", "C0"),
("wazuh_siem_correlation", "Wazuh SIEM correlation / rule / decoder readback", "C0"),
("kali_assessment_orchestration", "Kali 112 scanner health / scope / finding normalization", "C0"),
("alert_routing_noise_budget", "Alert routing / dedup / inhibit / noise budget", "C0"),
("incident_case_management", "Incident / case / owner response / escalation", "C0"),
("forensic_evidence_retention", "Forensic evidence retention / redaction / chain of custody", "C0"),
("config_drift_control", "Nginx / host / K8s / runner / workflow config drift", "C0"),
("supply_chain_ci_cd", "Gitea / runner / workflow / Harbor / SBOM / image evidence", "C0"),
("identity_secret_access", "IAM / SSH / sudo / secret / token exposure control", "C1"),
("web_api_appsec_logging", "Web / API appsec logging / auth / rate-limit / headers", "C1"),
("backup_recovery_dr", "Backup / restore / escrow / DR recovery proof", "C1"),
("executive_reporting_metrics", "Dashboard / KPI / LOGBOOK / no-false-green reporting", "C1"),
]
SIGNAL_SOURCES = [
("wazuh_endpoint_alerts", "Wazuh endpoint / agent / FIM alerts"),
("kali_112_health_findings", "Kali 112 health / scope / normalized findings"),
("prometheus_alertmanager", "Prometheus / Alertmanager rules and delivery chain"),
("signoz_apm_traces", "SigNoz traces / metrics / errors"),
("sentry_application_errors", "Sentry application and frontend errors"),
("nginx_gateway_tls", "Nginx / gateway / TLS / ACME / upstream evidence"),
("host_auth_process_network", "Host auth / process / network / package evidence"),
("docker_systemd_runtime", "Docker / systemd / process / port binding evidence"),
("k8s_argocd_gitops", "K8s / ArgoCD / RBAC / NetworkPolicy evidence"),
("gitea_runner_workflow", "Gitea / runner / workflow / deploy key evidence"),
("harbor_container_sbom", "Harbor / registry / container image / SBOM evidence"),
("backup_restore_dr", "Backup / restore / offsite / escrow / cold-start evidence"),
]
CONTROL_CANDIDATES = [
("SOC-P0-01", "資產與 owner 對應表先完整", "C0", "P0"),
("SOC-P0-02", "Wazuh agent / manager / indexer / dashboard health ref 收件", "C0", "P0"),
("SOC-P0-03", "Host auth / process / network / FIM 訊號正規化", "C0", "P0"),
("SOC-P0-04", "Kali 112 health / tool version / scope approval 串接", "C0", "P0"),
("SOC-P0-05", "Kali finding 只接脫敏 envelope不接 raw output", "C0", "P0"),
("SOC-P0-06", "Prometheus / Alertmanager / Telegram 告警鏈 no-false-green", "C0", "P0"),
("SOC-P0-07", "Sentry / SigNoz 與主機入侵線索關聯", "C0", "P0"),
("SOC-P0-08", "Nginx / firewall / route / TLS / gateway drift 關聯到 SIEM", "C0", "P0"),
("SOC-P0-09", "Gitea / runner / workflow / secret metadata 供應鏈關聯", "C0", "P0"),
("SOC-P0-10", "CISA KEV / CVE / package / image 風險排序", "C0", "P0"),
("SOC-P0-11", "Incident case / owner response / escalation queue", "C0", "P0"),
("SOC-P0-12", "鑑識證據、chain of custody、redaction 與保存期", "C0", "P0"),
("SOC-P1-13", "Suricata / NDR passive telemetry lane", "C1", "P1"),
("SOC-P1-14", "Web / API appsec logging 與 OWASP ASVS 驗證", "C1", "P1"),
("SOC-P1-15", "Backup / restore / DR recovery proof 串回 incident", "C1", "P1"),
("SOC-P1-16", "SOC KPI / dashboard / LOGBOOK / executive reporting", "C1", "P1"),
("SOC-P1-17", "SOAR draft / case automation 只建立候選,不自動封鎖", "C1", "P1"),
("SOC-P1-18", "Wazuh active response dry-run 與 rollback gate", "C1", "P1"),
("SOC-P1-19", "Kali active / credentialed scan approval package", "C1", "P1"),
("SOC-P1-20", "NDR / IPS / firewall containment promotion criteria", "C1", "P1"),
]
REQUIRED_OWNER_FIELDS = [
"control_id",
"owner_role",
"owner_team",
"decision",
"decision_reason",
"affected_scope_aliases",
"asset_owner_map_ref",
"signal_source_refs",
"wazuh_manager_ref",
"wazuh_agent_status_refs",
"wazuh_event_refs",
"kali_scope_ref",
"kali_health_ref",
"kali_finding_envelope_ref",
"prometheus_alert_ref",
"alertmanager_route_ref",
"signoz_trace_ref",
"sentry_issue_ref",
"host_forensic_refs",
"fim_or_persistence_refs",
"network_detection_refs",
"gateway_config_diff_refs",
"runner_workflow_refs",
"container_sbom_refs",
"kev_or_cve_refs",
"incident_case_ref",
"severity_mapping_ref",
"confidence_mapping_ref",
"noise_budget_ref",
"dedupe_fingerprint_ref",
"redacted_evidence_refs",
"raw_payload_absence_attestation",
"secret_value_absence_attestation",
"chain_of_custody_ref",
"retention_policy_ref",
"maintenance_window",
"rollback_owner",
"rollback_plan_ref",
"validation_metrics",
"postcheck_owner",
"cross_project_sync_ref",
"followup_owner",
]
REVIEWER_CHECKS = [
"owner role / team 必填",
"asset alias 必須脫敏",
"Wazuh event ref 不得是 raw payload",
"Kali finding 只能接 normalized envelope",
"Kali active scan 必須獨立批准",
"Kali /execute 必須維持封鎖",
"Prometheus / Alertmanager reload 未授權",
"Telegram 實發未授權",
"Sentry / SigNoz 只能作關聯證據",
"route 200 不得當資安通過",
"agent active 不得當入侵清除",
"dashboard up 不得當事件結案",
"host forensic refs 必須含時間窗",
"FIM / persistence refs 必須標示來源",
"network detection refs 不得含 raw packet payload",
"secret value / hash / partial token 一律拒收",
"runner / workflow / deploy key 只收 metadata",
"CVE / KEV 必須有 owner 與 SLA",
"false positive handling 必須可追蹤",
"dedupe fingerprint 必須穩定",
"noise budget 必須列收斂策略",
"case escalation 必須有 owner",
"maintenance window 必須存在",
"rollback owner 必須存在",
"postcheck 必須獨立",
"cross-project sync 必須存在",
"LOGBOOK 更新不能含內部逐字稿",
"前台不得顯示個人 namespace 原文",
"SOAR 只能 draft 不得 auto block",
"active response 先 dry-run",
"NDR / IPS 不能直接進 blocking",
"firewall containment 必須 break-glass 或維護窗口",
"package patch 需維護窗口",
"backup / restore proof 不能用備份存在代替",
"all accepted / authorized / executed counters 必須維持 0",
"runtime gate 必須維持 0",
]
OUTCOME_LANES = [
"waiting_soc_owner_packet",
"request_wazuh_event_supplement",
"request_kali_scope_supplement",
"request_host_forensic_supplement",
"request_alert_chain_supplement",
"request_gateway_diff_supplement",
"request_supply_chain_supplement",
"request_kev_cve_supplement",
"quarantine_secret_or_raw_payload",
"reject_claim_without_cross_evidence",
"ready_for_soc_reviewer_review",
"waiting_runtime_authorization",
"blocked_no_rollback_or_postcheck",
"route_to_high_value_config_gate",
]
BLOCKED_ACTIONS = [
"call_wazuh_api_live",
"enable_wazuh_active_response",
"install_wazuh_agent",
"restart_wazuh_agent",
"change_wazuh_rule",
"change_wazuh_decoder",
"store_raw_wazuh_payload",
"read_wazuh_password",
"disable_wazuh_tls_verification",
"call_kali_scan",
"call_kali_execute",
"run_kali_active_scan",
"run_kali_credentialed_scan",
"run_nmap_scan",
"run_nuclei_scan",
"run_nikto_scan",
"run_trivy_scan_live",
"run_lynis_scan_live",
"update_kali_packages",
"reboot_kali_host",
"change_kali_service",
"ssh_to_host",
"sudo_action",
"read_host_live_log",
"read_host_env",
"write_host_file",
"kill_process",
"isolate_host",
"docker_restart",
"docker_compose_up",
"docker_compose_down",
"systemctl_restart",
"systemctl_stop",
"systemctl_start",
"nginx_test",
"nginx_reload",
"nginx_conf_write",
"certbot_renew",
"dns_change",
"route_change",
"upstream_change",
"firewall_drop",
"firewall_allow",
"port_close",
"port_open",
"wireguard_change",
"nodeport_change",
"network_policy_apply",
"argocd_sync",
"kubectl_apply",
"kubectl_delete",
"helm_upgrade",
"rbac_change",
"k8s_secret_change",
"prometheus_reload",
"alertmanager_reload",
"grafana_dashboard_apply",
"signoz_rule_apply",
"sentry_config_change",
"langfuse_config_change",
"otel_collector_reload",
"receiver_route_change",
"silence_policy_change",
"telegram_send",
"notification_route_change",
"webhook_receiver_change",
"remote_write_change",
"exporter_deploy",
"live_alert_fire",
"alert_chain_smoke_live",
"create_soar_case_live",
"run_soar_playbook",
"auto_block_ip",
"auto_quarantine_endpoint",
"auto_rotate_secret",
"workflow_modification",
"gitea_action_dispatch",
"runner_config_change",
"deploy_key_change",
"webhook_change",
"repo_secret_change",
"secret_store_read",
"collect_password",
"collect_private_key",
"collect_runner_token",
"collect_webhook_secret",
"collect_cookie_or_session",
"collect_secret_hash",
"collect_partial_token",
"store_raw_packet",
"store_raw_log",
"store_unredacted_screenshot",
"database_migration",
"production_write",
"open_runtime_gate",
"add_action_button",
"force_push",
"sync_git_refs",
"switch_github_primary",
"change_codeowners",
"change_branch_protection",
"change_cors",
"disable_rate_limit",
]
EXECUTION_BOUNDARIES = {
"not_authorization": True,
"runtime_execution_authorized": False,
"wazuh_api_live_query_authorized": False,
"wazuh_active_response_authorized": False,
"kali_scan_authorized": False,
"kali_execute_authorized": False,
"active_scan_authorized": False,
"credentialed_scan_authorized": False,
"host_write_authorized": False,
"ssh_write_authorized": False,
"firewall_change_authorized": False,
"nginx_reload_authorized": False,
"prometheus_reload_authorized": False,
"alertmanager_reload_authorized": False,
"telegram_send_authorized": False,
"soar_case_create_authorized": False,
"auto_block_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"runtime_gate_open": False,
"action_buttons_allowed": False,
}
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def build_report(root: Path, generated_at: str | None) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
c0_candidates = [item for item in CONTROL_CANDIDATES if item[2] == "C0"]
c1_candidates = [item for item in CONTROL_CANDIDATES if item[2] == "C1"]
p0_candidates = [item for item in CONTROL_CANDIDATES if item[3] == "P0"]
p1_candidates = [item for item in CONTROL_CANDIDATES if item[3] == "P1"]
return {
"schema_version": "soc_siem_kali_wazuh_integration_control_v1",
"generated_at": report_time,
"git_commit": git_short_sha(root),
"status": "soc_siem_kali_wazuh_integration_control_ready_no_runtime_action",
"standard_frameworks": [
{"framework_id": item["framework_id"], **item} for item in STANDARD_FRAMEWORKS
],
"control_domains": [
{
"domain_id": domain_id,
"label": label,
"control_tier": tier,
"owner_response_required": True,
"runtime_gate_open": False,
}
for domain_id, label, tier in CONTROL_DOMAINS
],
"signal_sources": [
{
"source_id": source_id,
"label": label,
"redacted_evidence_only": True,
"live_query_authorized": False,
}
for source_id, label in SIGNAL_SOURCES
],
"control_candidates": [
{
"control_id": control_id,
"title": title,
"control_tier": tier,
"priority": priority,
"owner_response_required": True,
"runtime_gate_open": False,
}
for control_id, title, tier, priority in CONTROL_CANDIDATES
],
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"reviewer_checks": [
{"check_id": f"soc_review_{index:02d}", "instruction": instruction}
for index, instruction in enumerate(REVIEWER_CHECKS, start=1)
],
"outcome_lanes": [
{"lane_id": lane_id, "runtime_gate_open": False} for lane_id in OUTCOME_LANES
],
"blocked_actions": BLOCKED_ACTIONS,
"summary": {
"standard_framework_count": len(STANDARD_FRAMEWORKS),
"control_domain_count": len(CONTROL_DOMAINS),
"c0_control_domain_count": sum(1 for _, _, tier in CONTROL_DOMAINS if tier == "C0"),
"c1_control_domain_count": sum(1 for _, _, tier in CONTROL_DOMAINS if tier == "C1"),
"signal_source_count": len(SIGNAL_SOURCES),
"control_candidate_count": len(CONTROL_CANDIDATES),
"c0_control_candidate_count": len(c0_candidates),
"c1_control_candidate_count": len(c1_candidates),
"p0_control_candidate_count": len(p0_candidates),
"p1_control_candidate_count": len(p1_candidates),
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
"reviewer_check_count": len(REVIEWER_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"blocked_action_count": len(BLOCKED_ACTIONS),
"coverage_percent_after_soc_integration_control": 78,
"monitoring_alerting_observability_coverage_percent_after_soc_control": 78,
"security_evidence_tooling_coverage_percent_after_soc_control": 88,
"wazuh_event_ref_received_count": 0,
"kali_scope_ref_accepted_count": 0,
"kali_finding_envelope_accepted_count": 0,
"siem_correlation_rule_accepted_count": 0,
"alert_route_accepted_count": 0,
"incident_case_accepted_count": 0,
"forensic_evidence_accepted_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"active_response_enabled_count": 0,
"kali_active_scan_authorized_count": 0,
"kali_execute_authorized_count": 0,
"prometheus_reload_authorized_count": 0,
"alertmanager_reload_authorized_count": 0,
"telegram_send_authorized_count": 0,
"soar_case_create_authorized_count": 0,
"auto_block_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": EXECUTION_BOUNDARIES,
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS SOC / SIEM / Kali / Wazuh 整合控制矩陣")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
summary = report["summary"]
print(
"SOC_SIEM_KALI_WAZUH_INTEGRATION_CONTROL_OK "
f"frameworks={summary['standard_framework_count']} "
f"domains={summary['control_domain_count']} "
f"signals={summary['signal_source_count']} "
f"candidates={summary['control_candidate_count']} "
f"runtime_gate={summary['runtime_gate_count']}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())