Files
awoooi/scripts/security/k8s-argocd-manifest-inventory.py

330 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS K8s / ArgoCD production manifest repo-only 清冊。
本工具只讀取 repo 內 K8s / ArgoCD / Velero / monitoring source files整理
path、SHA256、top-level kind marker 與 owner gate 缺口。它不做 kubectl、
不連 ArgoCD、不讀 live cluster、不套用 manifest、不 sync、不改 secret。
"""
from __future__ import annotations
import argparse
import hashlib
import json
import subprocess
import sys
from collections import Counter
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
SCAN_GROUPS = [
{
"group_id": "awoooi_prod",
"label": "AWOOOI production namespace manifests",
"root": "k8s/awoooi-prod",
"control_tier": "C0",
},
{
"group_id": "argocd",
"label": "ArgoCD application and metrics exposure",
"root": "k8s/argocd",
"control_tier": "C0",
},
{
"group_id": "velero",
"label": "Velero backup / restore manifests",
"root": "k8s/velero",
"control_tier": "C0",
},
{
"group_id": "monitoring",
"label": "K8s monitoring and alert source",
"root": "k8s/monitoring",
"control_tier": "C1",
},
]
REQUIRED_OWNER_FIELDS = [
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"argocd_health_readback_ref",
"argocd_sync_revision_ref",
"rollback_revision",
"followup_owner",
"maintenance_window",
"validation_plan",
]
EVIDENCE_GAPS = [
"owner_response",
"rendered_manifest_diff",
"argocd_health_readback",
"argocd_sync_revision",
"kubectl_dry_run_or_server_validation_plan",
"rollout_blast_radius",
"rollback_revision",
"postcheck_metrics",
]
BLOCKED_ACTIONS = [
"argocd_sync",
"kubectl_apply",
"kubectl_patch",
"kubectl_delete",
"helm_upgrade",
"secret_value_collection",
"live_cluster_write",
"manual_pod_restart",
"scale_workload",
"change_network_policy",
"change_rbac",
"restore_backup",
"open_runtime_gate",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(65536), b""):
digest.update(chunk)
return digest.hexdigest()
def is_yaml_like(path: Path) -> bool:
return path.suffix in {".yaml", ".yml"}
def extract_top_level_kinds(text: str) -> list[str]:
kinds: list[str] = []
for line in text.splitlines():
if line.startswith("kind:"):
value = line.split(":", 1)[1].strip()
if value:
kinds.append(value)
return kinds
def gate_tags_for(path: Path, kinds: list[str]) -> list[str]:
tags: set[str] = set()
name = str(path).lower()
kind_set = set(kinds)
if "Application" in kind_set:
tags.add("argocd_application")
if kind_set & {"Deployment", "CronJob"}:
tags.add("workload_or_schedule")
if "Secret" in kind_set or "secret" in name or "credentials" in name:
tags.add("secret_metadata")
if kind_set & {"ClusterRole", "ClusterRoleBinding", "ServiceAccount"}:
tags.add("rbac")
if "NetworkPolicy" in kind_set:
tags.add("network_policy")
if kind_set & {"PodDisruptionBudget", "HorizontalPodAutoscaler", "VerticalPodAutoscaler"}:
tags.add("availability_and_scaling")
if kind_set & {"BackupStorageLocation"} or "velero" in name or "backup" in name or "restore" in name:
tags.add("backup_restore")
if kind_set & {"PrometheusRule"} or "prometheus" in name or "alert" in name:
tags.add("monitoring_alerting")
if path.suffix == ".sh":
tags.add("apply_capable_script")
if not tags:
tags.add("supporting_source")
return sorted(tags)
def row_for(path: Path, group: dict[str, str], repo_root: Path) -> dict[str, Any]:
relative = path.relative_to(repo_root).as_posix()
text = path.read_text(encoding="utf-8", errors="ignore")
kinds = extract_top_level_kinds(text) if is_yaml_like(path) else []
gate_tags = gate_tags_for(path, kinds)
return {
"path": relative,
"group_id": group["group_id"],
"group_label": group["label"],
"control_tier": group["control_tier"],
"file_type": "yaml_manifest" if is_yaml_like(path) else "supporting_source",
"sha256": sha256_file(path),
"top_level_kinds": kinds,
"top_level_kind_count": len(kinds),
"gate_tags": gate_tags,
"owner_gate_required": True,
"owner_response_received": False,
"owner_response_accepted": False,
"rendered_manifest_diff_ready": False,
"argocd_health_readback_received": False,
"argocd_sync_authorized": False,
"argocd_sync_executed": False,
"kubectl_action_authorized": False,
"kubectl_action_executed": False,
"secret_value_collection_allowed": False,
"runtime_gate": False,
"action_buttons_allowed": False,
}
def build_report(root: Path, generated_at: str | None) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
rows: list[dict[str, Any]] = []
group_summaries: list[dict[str, Any]] = []
for group in SCAN_GROUPS:
group_root = root / group["root"]
files = sorted(path for path in group_root.rglob("*") if path.is_file())
group_rows = [row_for(path, group, root) for path in files]
rows.extend(group_rows)
group_summaries.append(
{
"group_id": group["group_id"],
"label": group["label"],
"root": group["root"],
"control_tier": group["control_tier"],
"file_count": len(group_rows),
"yaml_manifest_file_count": sum(1 for row in group_rows if row["file_type"] == "yaml_manifest"),
"supporting_source_file_count": sum(1 for row in group_rows if row["file_type"] != "yaml_manifest"),
"owner_gate_required_count": len(group_rows),
"owner_response_accepted_count": 0,
"runtime_gate_count": 0,
}
)
kind_counter: Counter[str] = Counter()
gate_tag_counter: Counter[str] = Counter()
for row in rows:
kind_counter.update(row["top_level_kinds"])
gate_tag_counter.update(row["gate_tags"])
c0_rows = [row for row in rows if row["control_tier"] == "C0"]
c1_rows = [row for row in rows if row["control_tier"] == "C1"]
return {
"schema_version": "k8s_argocd_manifest_inventory_v1",
"generated_at": report_time,
"git_commit": git_short_sha(root),
"mode": "repo_only_manifest_inventory",
"status": "repo_only_inventory_ready_no_live_cluster_read",
"scan_groups": SCAN_GROUPS,
"summary": {
"scan_group_count": len(SCAN_GROUPS),
"file_count": len(rows),
"c0_file_count": len(c0_rows),
"c1_file_count": len(c1_rows),
"yaml_manifest_file_count": sum(1 for row in rows if row["file_type"] == "yaml_manifest"),
"supporting_source_file_count": sum(1 for row in rows if row["file_type"] != "yaml_manifest"),
"unique_kind_count": len(kind_counter),
"top_level_kind_marker_count": sum(kind_counter.values()),
"deployment_object_count": kind_counter["Deployment"],
"cronjob_object_count": kind_counter["CronJob"],
"secret_object_count": kind_counter["Secret"],
"network_policy_object_count": kind_counter["NetworkPolicy"],
"rbac_object_count": kind_counter["ServiceAccount"]
+ kind_counter["ClusterRole"]
+ kind_counter["ClusterRoleBinding"],
"autoscaling_object_count": kind_counter["HorizontalPodAutoscaler"]
+ kind_counter["VerticalPodAutoscaler"],
"argocd_application_count": kind_counter["Application"],
"prometheus_rule_count": kind_counter["PrometheusRule"],
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
"evidence_gap_count": len(EVIDENCE_GAPS),
"blocked_action_count": len(BLOCKED_ACTIONS),
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"rendered_manifest_diff_ready_count": 0,
"argocd_health_readback_received_count": 0,
"argocd_sync_authorized_count": 0,
"argocd_sync_executed_count": 0,
"kubectl_action_authorized_count": 0,
"kubectl_action_executed_count": 0,
"live_cluster_read_authorized_count": 0,
"live_cluster_read_executed_count": 0,
"secret_value_collection_allowed_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"live_cluster_read_authorized": False,
"live_cluster_read_executed": False,
"argocd_api_read_authorized": False,
"argocd_sync_authorized": False,
"argocd_sync_executed": False,
"kubectl_action_authorized": False,
"kubectl_action_executed": False,
"manifest_apply_authorized": False,
"manifest_apply_executed": False,
"secret_value_collection_allowed": False,
"production_write_authorized": False,
"runtime_execution_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"evidence_gaps": EVIDENCE_GAPS,
"blocked_actions": BLOCKED_ACTIONS,
"kind_counts": dict(sorted(kind_counter.items())),
"gate_tag_counts": dict(sorted(gate_tag_counter.items())),
"group_summaries": group_summaries,
"manifest_rows": rows,
"next_steps": [
"將 repo-only 清冊轉成 owner request 草稿;未有 owner response 前不得 sync 或 apply。",
"未來若要讀 live ArgoCD / K8s只能另開 readback approval不得由本清冊推定授權。",
"任何 ArgoCD sync、kubectl action、secret rotation 或 restore 都必須另開 maintenance window、rollback revision 與 post-check gate。",
],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS K8s / ArgoCD production manifest repo-only 清冊")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
summary = report["summary"]
print(
"K8S_ARGOCD_MANIFEST_INVENTORY_OK "
f"files={summary['file_count']} "
f"c0={summary['c0_file_count']} "
f"yaml={summary['yaml_manifest_file_count']} "
f"kinds={summary['unique_kind_count']} "
f"runtime_gate={summary['runtime_gate_count']}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())