Files
awoooi/scripts/security/package-supply-chain-owner-policy-guard.py

212 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""驗證 Package / Docker 供應鏈 owner policy gate 維持只讀邊界。
本 guard 只讀取 repo 內的 package / Docker 供應鏈 baseline 與 owner
policy gate snapshot不安裝套件、不連外查 CVE / license、不產生 lockfile、
不 pull / build / push image也不修改 workflow、registry 或 production。
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
EXPECTED_BASELINE_GAPS = [
"python_lockfile_absent",
"docker_base_images_not_all_digest_pinned",
"docker_copy_from_images_not_all_digest_pinned",
"compose_images_not_all_digest_pinned",
"requirements_unpinned_entries_present",
]
EXPECTED_REQUEST_IDS = [
"supply_chain_owner_policy:package_manager_lockfile_owner",
"supply_chain_owner_policy:python_lockfile_policy",
"supply_chain_owner_policy:requirements_pin_policy",
"supply_chain_owner_policy:dockerfile_digest_pin_policy",
"supply_chain_owner_policy:compose_image_digest_pin_policy",
"supply_chain_owner_policy:cve_license_sbom_window",
]
EXPECTED_OWNER_FIELDS = [
"package_manager_policy",
"lockfile_owner",
"python_lock_policy",
"docker_base_image_policy",
"compose_image_policy",
"registry_owner",
"cve_scan_window",
"rollback_owner",
]
FALSE_BOUNDARY_KEYS = [
"package_installation_allowed",
"package_upgrade_allowed",
"lockfile_write_allowed",
"requirements_pin_change_allowed",
"external_cve_lookup_allowed",
"external_license_lookup_allowed",
"sbom_generation_allowed",
"docker_pull_allowed",
"docker_build_allowed",
"docker_push_allowed",
"image_tag_change_allowed",
"image_digest_pin_change_allowed",
"registry_login_allowed",
"workflow_modification_authorized",
"production_deploy_authorized",
"runtime_execution_authorized",
"action_buttons_allowed",
"secret_value_collection_allowed",
]
def fail(message: str) -> None:
raise SystemExit(f"BLOCKED {message}")
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def assert_equal(label: str, actual: Any, expected: Any) -> None:
if actual != expected:
fail(f"{label}: expected {expected!r}, got {actual!r}")
def assert_path_exists(root: Path, relative_path: str) -> None:
if not (root / relative_path).exists():
fail(f"path missing: {relative_path}")
def assert_text_contains(label: str, text: str, expected: str) -> None:
if expected not in text:
fail(f"{label}: missing {expected!r}")
def assert_false_boundaries(label: str, data: dict[str, Any]) -> None:
boundaries = data.get("execution_boundaries", {})
for key in FALSE_BOUNDARY_KEYS:
assert_equal(f"{label}.execution_boundaries.{key}", boundaries.get(key), False)
assert_equal(f"{label}.execution_boundaries.runtime_gate_count", boundaries.get("runtime_gate_count"), 0)
assert_equal(f"{label}.execution_boundaries.not_authorization", boundaries.get("not_authorization"), True)
def validate_baseline(root: Path) -> dict[str, Any]:
baseline_path = root / "docs/security/package-supply-chain-baseline.snapshot.json"
assert_path_exists(root, "docs/security/package-supply-chain-baseline.snapshot.json")
baseline = load_json(baseline_path)
summary = baseline.get("summary", {})
assert_equal("baseline.schema_version", baseline.get("schema_version"), "package_supply_chain_baseline_v1")
assert_equal("baseline.status", baseline.get("status"), "repo_only_inventory_ready_needs_owner_policy")
assert_equal("baseline.mode", baseline.get("mode"), "repo_snapshot_only_no_install_no_network_no_cve_scan")
assert_equal("baseline.summary.package_json_count", summary.get("package_json_count"), 6)
assert_equal("baseline.summary.pyproject_count", summary.get("pyproject_count"), 4)
assert_equal("baseline.summary.requirements_file_count", summary.get("requirements_file_count"), 2)
assert_equal("baseline.summary.requirements_unpinned_entry_count", summary.get("requirements_unpinned_entry_count"), 26)
assert_equal("baseline.summary.python_lockfile_count", summary.get("python_lockfile_count"), 0)
assert_equal("baseline.summary.docker_base_digest_pinned_count", summary.get("docker_base_digest_pinned_count"), 0)
assert_equal(
"baseline.summary.compose_digest_pinned_image_ref_count",
summary.get("compose_digest_pinned_image_ref_count"),
0,
)
assert_equal("baseline.summary.gap_count", summary.get("gap_count"), 5)
assert_equal("baseline.gaps", baseline.get("gaps"), EXPECTED_BASELINE_GAPS)
assert_equal("baseline.owner_response_received_count", summary.get("owner_response_received_count"), 0)
assert_equal("baseline.owner_response_accepted_count", summary.get("owner_response_accepted_count"), 0)
assert_equal("baseline.runtime_gate_count", summary.get("runtime_gate_count"), 0)
assert_equal("baseline.action_button_count", summary.get("action_button_count"), 0)
for relative_path in baseline.get("lockfiles", []):
assert_path_exists(root, relative_path)
for key in ["package_json_manifests", "pyproject_manifests", "requirements_files", "dockerfiles", "compose_files"]:
for item in baseline.get(key, []):
assert_path_exists(root, item["path"])
return baseline
def validate_policy_gate(root: Path, baseline: dict[str, Any]) -> None:
gate_path = root / "docs/security/package-supply-chain-owner-policy-gate.snapshot.json"
doc_path = root / "docs/security/PACKAGE-SUPPLY-CHAIN-OWNER-POLICY-GATE.md"
assert_path_exists(root, "docs/security/package-supply-chain-owner-policy-gate.snapshot.json")
assert_path_exists(root, "docs/security/PACKAGE-SUPPLY-CHAIN-OWNER-POLICY-GATE.md")
gate = load_json(gate_path)
summary = gate.get("summary", {})
assert_equal("policy_gate.schema_version", gate.get("schema_version"), "package_supply_chain_owner_policy_gate_v1")
assert_equal("policy_gate.status", gate.get("status"), "draft_waiting_owner_policy_response")
assert_equal("policy_gate.mode", gate.get("mode"), "repo_snapshot_policy_gate_no_install_no_network_no_cve_scan")
assert_equal("policy_gate.baseline_ref", gate.get("baseline_ref"), "docs/security/package-supply-chain-baseline.snapshot.json")
assert_equal("policy_gate.summary.baseline_gap_count", summary.get("baseline_gap_count"), baseline["summary"]["gap_count"])
assert_equal("policy_gate.summary.owner_policy_request_count", summary.get("owner_policy_request_count"), 6)
assert_equal("policy_gate.summary.c0_policy_request_count", summary.get("c0_policy_request_count"), 2)
assert_equal("policy_gate.summary.write_capable_policy_request_count", summary.get("write_capable_policy_request_count"), 6)
assert_equal("policy_gate.summary.required_owner_field_count", summary.get("required_owner_field_count"), 8)
assert_equal("policy_gate.summary.reviewer_check_count", summary.get("reviewer_check_count"), 12)
assert_equal("policy_gate.summary.blocked_action_count", summary.get("blocked_action_count"), 20)
assert_equal("policy_gate.summary.owner_policy_request_sent_count", summary.get("owner_policy_request_sent_count"), 0)
assert_equal("policy_gate.summary.owner_response_received_count", summary.get("owner_response_received_count"), 0)
assert_equal("policy_gate.summary.owner_response_accepted_count", summary.get("owner_response_accepted_count"), 0)
assert_equal("policy_gate.summary.owner_response_rejected_count", summary.get("owner_response_rejected_count"), 0)
assert_equal("policy_gate.summary.runtime_gate_count", summary.get("runtime_gate_count"), 0)
assert_equal("policy_gate.summary.action_button_count", summary.get("action_button_count"), 0)
assert_equal("policy_gate.baseline_gaps", gate.get("baseline_gaps"), EXPECTED_BASELINE_GAPS)
assert_equal("policy_gate.required_owner_fields", gate.get("required_owner_fields"), EXPECTED_OWNER_FIELDS)
assert_equal("policy_gate.owner_policy_request_ids", [item["request_id"] for item in gate["owner_policy_requests"]], EXPECTED_REQUEST_IDS)
for item in gate["owner_policy_requests"]:
assert_equal(f"{item['request_id']}.status", item.get("status"), "draft_waiting_owner_policy_response")
assert_equal(f"{item['request_id']}.required_owner_fields", item.get("required_owner_fields"), EXPECTED_OWNER_FIELDS)
assert_equal(f"{item['request_id']}.request_sent", item.get("request_sent"), False)
assert_equal(f"{item['request_id']}.owner_response_received", item.get("owner_response_received"), False)
assert_equal(f"{item['request_id']}.owner_response_accepted", item.get("owner_response_accepted"), False)
assert_equal(f"{item['request_id']}.runtime_gate_open", item.get("runtime_gate_open"), False)
if not item.get("baseline_gap_refs"):
fail(f"{item['request_id']}.baseline_gap_refs: expected non-empty list")
for ref in item.get("evidence_refs", []):
assert_path_exists(root, ref)
if not item.get("blocked_until"):
fail(f"{item['request_id']}.blocked_until: expected non-empty list")
assert_false_boundaries("policy_gate", gate)
doc = doc_path.read_text(encoding="utf-8")
for text in [
"owner policy gate",
"Python lockfile",
"requirements pinning",
"Docker digest pinning",
"CVE / license / SBOM",
"runtime gate",
"0 / false",
"不 install、不 upgrade、不 pull image、不 build image、不 push image",
]:
assert_text_contains("policy_gate.doc", doc, text)
def validate(root: Path) -> None:
baseline = validate_baseline(root)
validate_policy_gate(root, baseline)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--root",
default=Path(__file__).resolve().parents[2],
type=Path,
help="Repository root. Defaults to the current script's repository.",
)
args = parser.parse_args()
validate(args.root.resolve())
print("PACKAGE_SUPPLY_CHAIN_OWNER_POLICY_GUARD_OK")
if __name__ == "__main__":
main()