330 lines
12 KiB
Python
330 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
IwoooS K8s / ArgoCD production manifest repo-only 清冊。
|
||
|
||
本工具只讀取 repo 內 K8s / ArgoCD / Velero / monitoring source files,整理
|
||
path、SHA256、top-level kind marker 與 owner gate 缺口。它不做 kubectl、
|
||
不連 ArgoCD、不讀 live cluster、不套用 manifest、不 sync、不改 secret。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
import subprocess
|
||
import sys
|
||
from collections import Counter
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
TAIPEI = timezone(timedelta(hours=8))
|
||
|
||
SCAN_GROUPS = [
|
||
{
|
||
"group_id": "awoooi_prod",
|
||
"label": "AWOOOI production namespace manifests",
|
||
"root": "k8s/awoooi-prod",
|
||
"control_tier": "C0",
|
||
},
|
||
{
|
||
"group_id": "argocd",
|
||
"label": "ArgoCD application and metrics exposure",
|
||
"root": "k8s/argocd",
|
||
"control_tier": "C0",
|
||
},
|
||
{
|
||
"group_id": "velero",
|
||
"label": "Velero backup / restore manifests",
|
||
"root": "k8s/velero",
|
||
"control_tier": "C0",
|
||
},
|
||
{
|
||
"group_id": "monitoring",
|
||
"label": "K8s monitoring and alert source",
|
||
"root": "k8s/monitoring",
|
||
"control_tier": "C1",
|
||
},
|
||
]
|
||
|
||
REQUIRED_OWNER_FIELDS = [
|
||
"owner_role_or_team",
|
||
"decision",
|
||
"decision_reason",
|
||
"affected_scope",
|
||
"redacted_evidence_refs",
|
||
"argocd_health_readback_ref",
|
||
"argocd_sync_revision_ref",
|
||
"rollback_revision",
|
||
"followup_owner",
|
||
"maintenance_window",
|
||
"validation_plan",
|
||
]
|
||
|
||
EVIDENCE_GAPS = [
|
||
"owner_response",
|
||
"rendered_manifest_diff",
|
||
"argocd_health_readback",
|
||
"argocd_sync_revision",
|
||
"kubectl_dry_run_or_server_validation_plan",
|
||
"rollout_blast_radius",
|
||
"rollback_revision",
|
||
"postcheck_metrics",
|
||
]
|
||
|
||
BLOCKED_ACTIONS = [
|
||
"argocd_sync",
|
||
"kubectl_apply",
|
||
"kubectl_patch",
|
||
"kubectl_delete",
|
||
"helm_upgrade",
|
||
"secret_value_collection",
|
||
"live_cluster_write",
|
||
"manual_pod_restart",
|
||
"scale_workload",
|
||
"change_network_policy",
|
||
"change_rbac",
|
||
"restore_backup",
|
||
"open_runtime_gate",
|
||
]
|
||
|
||
|
||
def git_short_sha(root: Path) -> str:
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "rev-parse", "--short", "HEAD"],
|
||
cwd=root,
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
return result.stdout.strip()
|
||
except Exception:
|
||
return "unknown"
|
||
|
||
|
||
def sha256_file(path: Path) -> str:
|
||
digest = hashlib.sha256()
|
||
with path.open("rb") as handle:
|
||
for chunk in iter(lambda: handle.read(65536), b""):
|
||
digest.update(chunk)
|
||
return digest.hexdigest()
|
||
|
||
|
||
def is_yaml_like(path: Path) -> bool:
|
||
return path.suffix in {".yaml", ".yml"}
|
||
|
||
|
||
def extract_top_level_kinds(text: str) -> list[str]:
|
||
kinds: list[str] = []
|
||
for line in text.splitlines():
|
||
if line.startswith("kind:"):
|
||
value = line.split(":", 1)[1].strip()
|
||
if value:
|
||
kinds.append(value)
|
||
return kinds
|
||
|
||
|
||
def gate_tags_for(path: Path, kinds: list[str]) -> list[str]:
|
||
tags: set[str] = set()
|
||
name = str(path).lower()
|
||
kind_set = set(kinds)
|
||
|
||
if "Application" in kind_set:
|
||
tags.add("argocd_application")
|
||
if kind_set & {"Deployment", "CronJob"}:
|
||
tags.add("workload_or_schedule")
|
||
if "Secret" in kind_set or "secret" in name or "credentials" in name:
|
||
tags.add("secret_metadata")
|
||
if kind_set & {"ClusterRole", "ClusterRoleBinding", "ServiceAccount"}:
|
||
tags.add("rbac")
|
||
if "NetworkPolicy" in kind_set:
|
||
tags.add("network_policy")
|
||
if kind_set & {"PodDisruptionBudget", "HorizontalPodAutoscaler", "VerticalPodAutoscaler"}:
|
||
tags.add("availability_and_scaling")
|
||
if kind_set & {"BackupStorageLocation"} or "velero" in name or "backup" in name or "restore" in name:
|
||
tags.add("backup_restore")
|
||
if kind_set & {"PrometheusRule"} or "prometheus" in name or "alert" in name:
|
||
tags.add("monitoring_alerting")
|
||
if path.suffix == ".sh":
|
||
tags.add("apply_capable_script")
|
||
if not tags:
|
||
tags.add("supporting_source")
|
||
return sorted(tags)
|
||
|
||
|
||
def row_for(path: Path, group: dict[str, str], repo_root: Path) -> dict[str, Any]:
|
||
relative = path.relative_to(repo_root).as_posix()
|
||
text = path.read_text(encoding="utf-8", errors="ignore")
|
||
kinds = extract_top_level_kinds(text) if is_yaml_like(path) else []
|
||
gate_tags = gate_tags_for(path, kinds)
|
||
return {
|
||
"path": relative,
|
||
"group_id": group["group_id"],
|
||
"group_label": group["label"],
|
||
"control_tier": group["control_tier"],
|
||
"file_type": "yaml_manifest" if is_yaml_like(path) else "supporting_source",
|
||
"sha256": sha256_file(path),
|
||
"top_level_kinds": kinds,
|
||
"top_level_kind_count": len(kinds),
|
||
"gate_tags": gate_tags,
|
||
"owner_gate_required": True,
|
||
"owner_response_received": False,
|
||
"owner_response_accepted": False,
|
||
"rendered_manifest_diff_ready": False,
|
||
"argocd_health_readback_received": False,
|
||
"argocd_sync_authorized": False,
|
||
"argocd_sync_executed": False,
|
||
"kubectl_action_authorized": False,
|
||
"kubectl_action_executed": False,
|
||
"secret_value_collection_allowed": False,
|
||
"runtime_gate": False,
|
||
"action_buttons_allowed": False,
|
||
}
|
||
|
||
|
||
def build_report(root: Path, generated_at: str | None) -> dict[str, Any]:
|
||
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
|
||
rows: list[dict[str, Any]] = []
|
||
group_summaries: list[dict[str, Any]] = []
|
||
|
||
for group in SCAN_GROUPS:
|
||
group_root = root / group["root"]
|
||
files = sorted(path for path in group_root.rglob("*") if path.is_file())
|
||
group_rows = [row_for(path, group, root) for path in files]
|
||
rows.extend(group_rows)
|
||
group_summaries.append(
|
||
{
|
||
"group_id": group["group_id"],
|
||
"label": group["label"],
|
||
"root": group["root"],
|
||
"control_tier": group["control_tier"],
|
||
"file_count": len(group_rows),
|
||
"yaml_manifest_file_count": sum(1 for row in group_rows if row["file_type"] == "yaml_manifest"),
|
||
"supporting_source_file_count": sum(1 for row in group_rows if row["file_type"] != "yaml_manifest"),
|
||
"owner_gate_required_count": len(group_rows),
|
||
"owner_response_accepted_count": 0,
|
||
"runtime_gate_count": 0,
|
||
}
|
||
)
|
||
|
||
kind_counter: Counter[str] = Counter()
|
||
gate_tag_counter: Counter[str] = Counter()
|
||
for row in rows:
|
||
kind_counter.update(row["top_level_kinds"])
|
||
gate_tag_counter.update(row["gate_tags"])
|
||
|
||
c0_rows = [row for row in rows if row["control_tier"] == "C0"]
|
||
c1_rows = [row for row in rows if row["control_tier"] == "C1"]
|
||
|
||
return {
|
||
"schema_version": "k8s_argocd_manifest_inventory_v1",
|
||
"generated_at": report_time,
|
||
"git_commit": git_short_sha(root),
|
||
"mode": "repo_only_manifest_inventory",
|
||
"status": "repo_only_inventory_ready_no_live_cluster_read",
|
||
"scan_groups": SCAN_GROUPS,
|
||
"summary": {
|
||
"scan_group_count": len(SCAN_GROUPS),
|
||
"file_count": len(rows),
|
||
"c0_file_count": len(c0_rows),
|
||
"c1_file_count": len(c1_rows),
|
||
"yaml_manifest_file_count": sum(1 for row in rows if row["file_type"] == "yaml_manifest"),
|
||
"supporting_source_file_count": sum(1 for row in rows if row["file_type"] != "yaml_manifest"),
|
||
"unique_kind_count": len(kind_counter),
|
||
"top_level_kind_marker_count": sum(kind_counter.values()),
|
||
"deployment_object_count": kind_counter["Deployment"],
|
||
"cronjob_object_count": kind_counter["CronJob"],
|
||
"secret_object_count": kind_counter["Secret"],
|
||
"network_policy_object_count": kind_counter["NetworkPolicy"],
|
||
"rbac_object_count": kind_counter["ServiceAccount"]
|
||
+ kind_counter["ClusterRole"]
|
||
+ kind_counter["ClusterRoleBinding"],
|
||
"autoscaling_object_count": kind_counter["HorizontalPodAutoscaler"]
|
||
+ kind_counter["VerticalPodAutoscaler"],
|
||
"argocd_application_count": kind_counter["Application"],
|
||
"prometheus_rule_count": kind_counter["PrometheusRule"],
|
||
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
|
||
"evidence_gap_count": len(EVIDENCE_GAPS),
|
||
"blocked_action_count": len(BLOCKED_ACTIONS),
|
||
"owner_response_received_count": 0,
|
||
"owner_response_accepted_count": 0,
|
||
"rendered_manifest_diff_ready_count": 0,
|
||
"argocd_health_readback_received_count": 0,
|
||
"argocd_sync_authorized_count": 0,
|
||
"argocd_sync_executed_count": 0,
|
||
"kubectl_action_authorized_count": 0,
|
||
"kubectl_action_executed_count": 0,
|
||
"live_cluster_read_authorized_count": 0,
|
||
"live_cluster_read_executed_count": 0,
|
||
"secret_value_collection_allowed_count": 0,
|
||
"runtime_gate_count": 0,
|
||
"action_button_count": 0,
|
||
},
|
||
"execution_boundaries": {
|
||
"live_cluster_read_authorized": False,
|
||
"live_cluster_read_executed": False,
|
||
"argocd_api_read_authorized": False,
|
||
"argocd_sync_authorized": False,
|
||
"argocd_sync_executed": False,
|
||
"kubectl_action_authorized": False,
|
||
"kubectl_action_executed": False,
|
||
"manifest_apply_authorized": False,
|
||
"manifest_apply_executed": False,
|
||
"secret_value_collection_allowed": False,
|
||
"production_write_authorized": False,
|
||
"runtime_execution_authorized": False,
|
||
"action_buttons_allowed": False,
|
||
"not_authorization": True,
|
||
},
|
||
"required_owner_fields": REQUIRED_OWNER_FIELDS,
|
||
"evidence_gaps": EVIDENCE_GAPS,
|
||
"blocked_actions": BLOCKED_ACTIONS,
|
||
"kind_counts": dict(sorted(kind_counter.items())),
|
||
"gate_tag_counts": dict(sorted(gate_tag_counter.items())),
|
||
"group_summaries": group_summaries,
|
||
"manifest_rows": rows,
|
||
"next_steps": [
|
||
"將 repo-only 清冊轉成 owner request 草稿;未有 owner response 前不得 sync 或 apply。",
|
||
"未來若要讀 live ArgoCD / K8s,只能另開 readback approval,不得由本清冊推定授權。",
|
||
"任何 ArgoCD sync、kubectl action、secret rotation 或 restore 都必須另開 maintenance window、rollback revision 與 post-check gate。",
|
||
],
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="IwoooS K8s / ArgoCD production manifest repo-only 清冊")
|
||
parser.add_argument("--root", default=".", help="repo root")
|
||
parser.add_argument("--output", help="寫出 JSON 報告")
|
||
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
|
||
args = parser.parse_args()
|
||
|
||
root = Path(args.root).resolve()
|
||
report = build_report(root, args.generated_at)
|
||
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
|
||
|
||
if args.output:
|
||
output = Path(args.output)
|
||
output.parent.mkdir(parents=True, exist_ok=True)
|
||
output.write_text(payload + "\n", encoding="utf-8")
|
||
else:
|
||
print(payload)
|
||
|
||
summary = report["summary"]
|
||
print(
|
||
"K8S_ARGOCD_MANIFEST_INVENTORY_OK "
|
||
f"files={summary['file_count']} "
|
||
f"c0={summary['c0_file_count']} "
|
||
f"yaml={summary['yaml_manifest_file_count']} "
|
||
f"kinds={summary['unique_kind_count']} "
|
||
f"runtime_gate={summary['runtime_gate_count']}",
|
||
file=sys.stderr,
|
||
)
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|