212 lines
10 KiB
Python
212 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""驗證 Package / Docker 供應鏈 owner policy gate 維持只讀邊界。
|
||
|
||
本 guard 只讀取 repo 內的 package / Docker 供應鏈 baseline 與 owner
|
||
policy gate snapshot,不安裝套件、不連外查 CVE / license、不產生 lockfile、
|
||
不 pull / build / push image,也不修改 workflow、registry 或 production。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
EXPECTED_BASELINE_GAPS = [
|
||
"python_lockfile_absent",
|
||
"docker_base_images_not_all_digest_pinned",
|
||
"docker_copy_from_images_not_all_digest_pinned",
|
||
"compose_images_not_all_digest_pinned",
|
||
"requirements_unpinned_entries_present",
|
||
]
|
||
|
||
EXPECTED_REQUEST_IDS = [
|
||
"supply_chain_owner_policy:package_manager_lockfile_owner",
|
||
"supply_chain_owner_policy:python_lockfile_policy",
|
||
"supply_chain_owner_policy:requirements_pin_policy",
|
||
"supply_chain_owner_policy:dockerfile_digest_pin_policy",
|
||
"supply_chain_owner_policy:compose_image_digest_pin_policy",
|
||
"supply_chain_owner_policy:cve_license_sbom_window",
|
||
]
|
||
|
||
EXPECTED_OWNER_FIELDS = [
|
||
"package_manager_policy",
|
||
"lockfile_owner",
|
||
"python_lock_policy",
|
||
"docker_base_image_policy",
|
||
"compose_image_policy",
|
||
"registry_owner",
|
||
"cve_scan_window",
|
||
"rollback_owner",
|
||
]
|
||
|
||
FALSE_BOUNDARY_KEYS = [
|
||
"package_installation_allowed",
|
||
"package_upgrade_allowed",
|
||
"lockfile_write_allowed",
|
||
"requirements_pin_change_allowed",
|
||
"external_cve_lookup_allowed",
|
||
"external_license_lookup_allowed",
|
||
"sbom_generation_allowed",
|
||
"docker_pull_allowed",
|
||
"docker_build_allowed",
|
||
"docker_push_allowed",
|
||
"image_tag_change_allowed",
|
||
"image_digest_pin_change_allowed",
|
||
"registry_login_allowed",
|
||
"workflow_modification_authorized",
|
||
"production_deploy_authorized",
|
||
"runtime_execution_authorized",
|
||
"action_buttons_allowed",
|
||
"secret_value_collection_allowed",
|
||
]
|
||
|
||
|
||
def fail(message: str) -> None:
|
||
raise SystemExit(f"BLOCKED {message}")
|
||
|
||
|
||
def load_json(path: Path) -> dict[str, Any]:
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
|
||
|
||
def assert_equal(label: str, actual: Any, expected: Any) -> None:
|
||
if actual != expected:
|
||
fail(f"{label}: expected {expected!r}, got {actual!r}")
|
||
|
||
|
||
def assert_path_exists(root: Path, relative_path: str) -> None:
|
||
if not (root / relative_path).exists():
|
||
fail(f"path missing: {relative_path}")
|
||
|
||
|
||
def assert_text_contains(label: str, text: str, expected: str) -> None:
|
||
if expected not in text:
|
||
fail(f"{label}: missing {expected!r}")
|
||
|
||
|
||
def assert_false_boundaries(label: str, data: dict[str, Any]) -> None:
|
||
boundaries = data.get("execution_boundaries", {})
|
||
for key in FALSE_BOUNDARY_KEYS:
|
||
assert_equal(f"{label}.execution_boundaries.{key}", boundaries.get(key), False)
|
||
assert_equal(f"{label}.execution_boundaries.runtime_gate_count", boundaries.get("runtime_gate_count"), 0)
|
||
assert_equal(f"{label}.execution_boundaries.not_authorization", boundaries.get("not_authorization"), True)
|
||
|
||
|
||
def validate_baseline(root: Path) -> dict[str, Any]:
|
||
baseline_path = root / "docs/security/package-supply-chain-baseline.snapshot.json"
|
||
assert_path_exists(root, "docs/security/package-supply-chain-baseline.snapshot.json")
|
||
baseline = load_json(baseline_path)
|
||
summary = baseline.get("summary", {})
|
||
|
||
assert_equal("baseline.schema_version", baseline.get("schema_version"), "package_supply_chain_baseline_v1")
|
||
assert_equal("baseline.status", baseline.get("status"), "repo_only_inventory_ready_needs_owner_policy")
|
||
assert_equal("baseline.mode", baseline.get("mode"), "repo_snapshot_only_no_install_no_network_no_cve_scan")
|
||
assert_equal("baseline.summary.package_json_count", summary.get("package_json_count"), 6)
|
||
assert_equal("baseline.summary.pyproject_count", summary.get("pyproject_count"), 4)
|
||
assert_equal("baseline.summary.requirements_file_count", summary.get("requirements_file_count"), 2)
|
||
assert_equal("baseline.summary.requirements_unpinned_entry_count", summary.get("requirements_unpinned_entry_count"), 26)
|
||
assert_equal("baseline.summary.python_lockfile_count", summary.get("python_lockfile_count"), 0)
|
||
assert_equal("baseline.summary.docker_base_digest_pinned_count", summary.get("docker_base_digest_pinned_count"), 0)
|
||
assert_equal(
|
||
"baseline.summary.compose_digest_pinned_image_ref_count",
|
||
summary.get("compose_digest_pinned_image_ref_count"),
|
||
0,
|
||
)
|
||
assert_equal("baseline.summary.gap_count", summary.get("gap_count"), 5)
|
||
assert_equal("baseline.gaps", baseline.get("gaps"), EXPECTED_BASELINE_GAPS)
|
||
assert_equal("baseline.owner_response_received_count", summary.get("owner_response_received_count"), 0)
|
||
assert_equal("baseline.owner_response_accepted_count", summary.get("owner_response_accepted_count"), 0)
|
||
assert_equal("baseline.runtime_gate_count", summary.get("runtime_gate_count"), 0)
|
||
assert_equal("baseline.action_button_count", summary.get("action_button_count"), 0)
|
||
|
||
for relative_path in baseline.get("lockfiles", []):
|
||
assert_path_exists(root, relative_path)
|
||
for key in ["package_json_manifests", "pyproject_manifests", "requirements_files", "dockerfiles", "compose_files"]:
|
||
for item in baseline.get(key, []):
|
||
assert_path_exists(root, item["path"])
|
||
return baseline
|
||
|
||
|
||
def validate_policy_gate(root: Path, baseline: dict[str, Any]) -> None:
|
||
gate_path = root / "docs/security/package-supply-chain-owner-policy-gate.snapshot.json"
|
||
doc_path = root / "docs/security/PACKAGE-SUPPLY-CHAIN-OWNER-POLICY-GATE.md"
|
||
assert_path_exists(root, "docs/security/package-supply-chain-owner-policy-gate.snapshot.json")
|
||
assert_path_exists(root, "docs/security/PACKAGE-SUPPLY-CHAIN-OWNER-POLICY-GATE.md")
|
||
gate = load_json(gate_path)
|
||
summary = gate.get("summary", {})
|
||
|
||
assert_equal("policy_gate.schema_version", gate.get("schema_version"), "package_supply_chain_owner_policy_gate_v1")
|
||
assert_equal("policy_gate.status", gate.get("status"), "draft_waiting_owner_policy_response")
|
||
assert_equal("policy_gate.mode", gate.get("mode"), "repo_snapshot_policy_gate_no_install_no_network_no_cve_scan")
|
||
assert_equal("policy_gate.baseline_ref", gate.get("baseline_ref"), "docs/security/package-supply-chain-baseline.snapshot.json")
|
||
assert_equal("policy_gate.summary.baseline_gap_count", summary.get("baseline_gap_count"), baseline["summary"]["gap_count"])
|
||
assert_equal("policy_gate.summary.owner_policy_request_count", summary.get("owner_policy_request_count"), 6)
|
||
assert_equal("policy_gate.summary.c0_policy_request_count", summary.get("c0_policy_request_count"), 2)
|
||
assert_equal("policy_gate.summary.write_capable_policy_request_count", summary.get("write_capable_policy_request_count"), 6)
|
||
assert_equal("policy_gate.summary.required_owner_field_count", summary.get("required_owner_field_count"), 8)
|
||
assert_equal("policy_gate.summary.reviewer_check_count", summary.get("reviewer_check_count"), 12)
|
||
assert_equal("policy_gate.summary.blocked_action_count", summary.get("blocked_action_count"), 20)
|
||
assert_equal("policy_gate.summary.owner_policy_request_sent_count", summary.get("owner_policy_request_sent_count"), 0)
|
||
assert_equal("policy_gate.summary.owner_response_received_count", summary.get("owner_response_received_count"), 0)
|
||
assert_equal("policy_gate.summary.owner_response_accepted_count", summary.get("owner_response_accepted_count"), 0)
|
||
assert_equal("policy_gate.summary.owner_response_rejected_count", summary.get("owner_response_rejected_count"), 0)
|
||
assert_equal("policy_gate.summary.runtime_gate_count", summary.get("runtime_gate_count"), 0)
|
||
assert_equal("policy_gate.summary.action_button_count", summary.get("action_button_count"), 0)
|
||
assert_equal("policy_gate.baseline_gaps", gate.get("baseline_gaps"), EXPECTED_BASELINE_GAPS)
|
||
assert_equal("policy_gate.required_owner_fields", gate.get("required_owner_fields"), EXPECTED_OWNER_FIELDS)
|
||
assert_equal("policy_gate.owner_policy_request_ids", [item["request_id"] for item in gate["owner_policy_requests"]], EXPECTED_REQUEST_IDS)
|
||
|
||
for item in gate["owner_policy_requests"]:
|
||
assert_equal(f"{item['request_id']}.status", item.get("status"), "draft_waiting_owner_policy_response")
|
||
assert_equal(f"{item['request_id']}.required_owner_fields", item.get("required_owner_fields"), EXPECTED_OWNER_FIELDS)
|
||
assert_equal(f"{item['request_id']}.request_sent", item.get("request_sent"), False)
|
||
assert_equal(f"{item['request_id']}.owner_response_received", item.get("owner_response_received"), False)
|
||
assert_equal(f"{item['request_id']}.owner_response_accepted", item.get("owner_response_accepted"), False)
|
||
assert_equal(f"{item['request_id']}.runtime_gate_open", item.get("runtime_gate_open"), False)
|
||
if not item.get("baseline_gap_refs"):
|
||
fail(f"{item['request_id']}.baseline_gap_refs: expected non-empty list")
|
||
for ref in item.get("evidence_refs", []):
|
||
assert_path_exists(root, ref)
|
||
if not item.get("blocked_until"):
|
||
fail(f"{item['request_id']}.blocked_until: expected non-empty list")
|
||
|
||
assert_false_boundaries("policy_gate", gate)
|
||
|
||
doc = doc_path.read_text(encoding="utf-8")
|
||
for text in [
|
||
"owner policy gate",
|
||
"Python lockfile",
|
||
"requirements pinning",
|
||
"Docker digest pinning",
|
||
"CVE / license / SBOM",
|
||
"runtime gate",
|
||
"0 / false",
|
||
"不 install、不 upgrade、不 pull image、不 build image、不 push image",
|
||
]:
|
||
assert_text_contains("policy_gate.doc", doc, text)
|
||
|
||
|
||
def validate(root: Path) -> None:
|
||
baseline = validate_baseline(root)
|
||
validate_policy_gate(root, baseline)
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
parser.add_argument(
|
||
"--root",
|
||
default=Path(__file__).resolve().parents[2],
|
||
type=Path,
|
||
help="Repository root. Defaults to the current script's repository.",
|
||
)
|
||
args = parser.parse_args()
|
||
validate(args.root.resolve())
|
||
print("PACKAGE_SUPPLY_CHAIN_OWNER_POLICY_GUARD_OK")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|