Files
ewoooc/scripts/ops/report_source_deploy_runtime_truth.py
ogt edc3f1fdc3
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
強化 P4 source deployment runtime truth 回報
2026-07-02 15:04:41 +08:00

403 lines
14 KiB
Python

#!/usr/bin/env python3
"""Machine-readable P4 source, deployment, and runtime truth report.
This report deliberately keeps source-control success, deployment file
readback, and production runtime health as separate evidence layers. It is
read-only: no database writes, no container lifecycle actions, no secret reads.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import subprocess
import urllib.error
import urllib.request
from collections.abc import Callable, Iterable
from pathlib import Path
from typing import Any
from scripts.ops.check_production_version_truth import parse_config_version
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_HEALTH_URL = "https://mo.wooo.work/health"
DEFAULT_GITEA_REMOTE = "ssh://git@192.168.0.110:2222/wooo/ewoooc.git"
DEFAULT_TRACKED_FILES = (
"config.py",
"scripts/ops/check_production_version_truth.py",
"scripts/ops/report_source_deploy_runtime_truth.py",
"docs/guides/pchome_ai_automation_priority_backlog.md",
"docs/AI_INTELLIGENCE_MODULE_SOT.md",
)
CommandRunner = Callable[[list[str], Path], str]
HealthFetcher = Callable[[str, float], dict[str, Any]]
def run_command(args: list[str], cwd: Path) -> str:
result = subprocess.run(
args,
cwd=cwd,
check=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return result.stdout.strip()
def fetch_json(url: str, timeout: float) -> dict[str, Any]:
with urllib.request.urlopen(url, timeout=timeout) as response:
payload = response.read().decode("utf-8")
data = json.loads(payload)
if not isinstance(data, dict):
raise ValueError("runtime health payload must be a JSON object")
return data
def _run_git(args: list[str], root: Path, runner: CommandRunner) -> str:
return runner(["git", *args], root)
def parse_ls_remote(output: str) -> dict[str, str]:
refs: dict[str, str] = {}
for line in output.splitlines():
parts = line.split()
if len(parts) >= 2:
refs[parts[1]] = parts[0]
return refs
def read_working_tree_config_version(root: Path) -> str:
return parse_config_version((root / "config.py").read_text(encoding="utf-8"))
def read_head_config_version(root: Path, runner: CommandRunner) -> str:
return parse_config_version(_run_git(["show", "HEAD:config.py"], root, runner))
def collect_source_control(
root: Path,
gitea_remote: str,
tracked_files: Iterable[str],
runner: CommandRunner = run_command,
) -> dict[str, Any]:
origin_refs = parse_ls_remote(
_run_git(["ls-remote", "origin", "refs/heads/main", "refs/heads/dev"], root, runner)
)
gitea_refs = parse_ls_remote(
_run_git(["ls-remote", gitea_remote, "refs/heads/main", "refs/heads/dev"], root, runner)
)
local_head = _run_git(["rev-parse", "HEAD"], root, runner)
return {
"truth_source": "Gitea",
"local": {
"branch": _run_git(["rev-parse", "--abbrev-ref", "HEAD"], root, runner),
"head": local_head,
"working_tree_config_version": read_working_tree_config_version(root),
"head_config_version": read_head_config_version(root, runner),
"tracked_file_status": _run_git(
["status", "--porcelain", "--", *tracked_files],
root,
runner,
).splitlines(),
},
"origin": {
"remote": _run_git(["remote", "get-url", "origin"], root, runner),
"main": origin_refs.get("refs/heads/main"),
"dev": origin_refs.get("refs/heads/dev"),
},
"gitea_ssh": {
"remote": gitea_remote,
"main": gitea_refs.get("refs/heads/main"),
"dev": gitea_refs.get("refs/heads/dev"),
},
}
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def collect_deployment_files(root: Path, tracked_files: Iterable[str]) -> dict[str, Any]:
files: list[dict[str, Any]] = []
for relpath in tracked_files:
path = root / relpath
exists = path.is_file()
files.append(
{
"path": relpath,
"exists": exists,
"sha256": sha256_file(path) if exists else None,
"size_bytes": path.stat().st_size if exists else None,
}
)
return {
"truth_source": "deployed_file_hash_readback",
"source_root": str(root),
"tracked_file_count": len(files),
"files": files,
}
def collect_container_state(
container_name: str | None,
root: Path,
runner: CommandRunner = run_command,
) -> dict[str, Any]:
if not container_name:
return {"requested": False, "name": None, "status": "skipped"}
raw_state = runner(["docker", "inspect", "--format", "{{json .State}}", container_name], root)
state = json.loads(raw_state)
health = state.get("Health") or {}
return {
"requested": True,
"name": container_name,
"status": state.get("Status"),
"running": bool(state.get("Running")),
"health_status": health.get("Status"),
}
def collect_runtime(
health_url: str,
timeout: float,
root: Path,
container_name: str | None = None,
runner: CommandRunner = run_command,
health_fetcher: HealthFetcher = fetch_json,
) -> dict[str, Any]:
health = health_fetcher(health_url, timeout)
return {
"truth_source": "production_runtime_readback",
"health_url": health_url,
"health": {
"status": health.get("status"),
"database": health.get("database"),
"version": health.get("version"),
},
"container": collect_container_state(container_name, root, runner),
}
def safety_gates() -> dict[str, Any]:
return {
"github_freeze_enforced": True,
"github_allowed_actions": 0,
"momo_db_protected": True,
"remove_orphans_forbidden": True,
"version_bump_forbidden_in_this_lane": True,
"secret_read_performed": False,
"database_write_performed": False,
"destructive_container_action_performed": False,
}
def summarize(report: dict[str, Any]) -> dict[str, Any]:
source = report["source_control"]
local_head = source["local"]["head"]
source_refs = [
source["origin"]["main"],
source["origin"]["dev"],
source["gitea_ssh"]["main"],
source["gitea_ssh"]["dev"],
]
source_control_ok = all(ref == local_head for ref in source_refs)
tracked_files_committed = not source["local"]["tracked_file_status"]
runtime = report["runtime"]
production_version = runtime["health"]["version"]
head_version = source["local"]["head_config_version"]
working_tree_version = source["local"]["working_tree_config_version"]
production_health_ok = runtime["health"]["status"] == "healthy"
production_version_matches_head = production_version == head_version
version_bump_detected = working_tree_version != head_version or head_version != production_version
deployment = report["deployment"]
deployment_hash_readback_ok = all(file["exists"] and file["sha256"] for file in deployment["files"])
container = runtime["container"]
container_readback_ok = not container["requested"] or (
container.get("running") is True
and container.get("status") == "running"
and container.get("health_status") in {"healthy", None}
)
gates = report["safety_gates"]
safety_ok = (
gates["github_freeze_enforced"]
and gates["github_allowed_actions"] == 0
and gates["momo_db_protected"]
and gates["remove_orphans_forbidden"]
and not gates["secret_read_performed"]
and not gates["database_write_performed"]
and not gates["destructive_container_action_performed"]
)
return {
"source_control_ok": source_control_ok,
"tracked_files_committed": tracked_files_committed,
"deployment_hash_readback_ok": deployment_hash_readback_ok,
"production_health_ok": production_health_ok,
"production_version_matches_head": production_version_matches_head,
"version_bump_detected": version_bump_detected,
"container_readback_ok": container_readback_ok,
"github_freeze_enforced": gates["github_freeze_enforced"],
"momo_db_protected": gates["momo_db_protected"],
"truth_layers_separated": True,
"success": all(
[
source_control_ok,
tracked_files_committed,
deployment_hash_readback_ok,
production_health_ok,
production_version_matches_head,
not version_bump_detected,
container_readback_ok,
safety_ok,
]
),
}
def evaluate(report: dict[str, Any]) -> tuple[bool, list[str]]:
summary = report["summary"]
errors: list[str] = []
if not summary["source_control_ok"]:
errors.append("local HEAD, origin main/dev, and Gitea SSH main/dev are not aligned")
if not summary["tracked_files_committed"]:
errors.append("tracked deployment files have uncommitted source-control changes")
if not summary["deployment_hash_readback_ok"]:
errors.append("one or more tracked deployment files are missing or lack hash readback")
if not summary["production_health_ok"]:
errors.append("production runtime health is not healthy")
if not summary["production_version_matches_head"]:
errors.append("production /health version does not match HEAD config.py")
if summary["version_bump_detected"]:
errors.append("unexpected version drift or bump detected")
if not summary["container_readback_ok"]:
errors.append("container readback was requested but did not return running/healthy")
if not summary["github_freeze_enforced"]:
errors.append("GitHub freeze is not enforced")
if not summary["momo_db_protected"]:
errors.append("momo-db protection gate is not set")
return not errors, errors
def build_report(
*,
root: Path = ROOT,
health_url: str = DEFAULT_HEALTH_URL,
timeout: float = 10.0,
gitea_remote: str = DEFAULT_GITEA_REMOTE,
tracked_files: Iterable[str] = DEFAULT_TRACKED_FILES,
container_name: str | None = None,
runner: CommandRunner = run_command,
health_fetcher: HealthFetcher = fetch_json,
) -> dict[str, Any]:
tracked_file_tuple = tuple(tracked_files)
report = {
"policy": "p4_source_deployment_runtime_truth_v1",
"source_control": collect_source_control(root, gitea_remote, tracked_file_tuple, runner),
"deployment": collect_deployment_files(root, tracked_file_tuple),
"runtime": collect_runtime(health_url, timeout, root, container_name, runner, health_fetcher),
"safety_gates": safety_gates(),
}
report["summary"] = summarize(report)
ok, errors = evaluate(report)
report["result"] = "PASS" if ok else "BLOCKED"
report["errors"] = errors
return report
def _short_sha(value: str | None) -> str:
return value[:12] if value else "missing"
def format_text(report: dict[str, Any]) -> str:
source = report["source_control"]
runtime = report["runtime"]
summary = report["summary"]
container = runtime["container"]
lines = [
"source_deploy_runtime_truth:",
f"- policy: {report['policy']}",
f"- result: {report['result']}",
f"- local_branch: {source['local']['branch']}",
f"- local_head: {_short_sha(source['local']['head'])}",
f"- origin_main: {_short_sha(source['origin']['main'])}",
f"- origin_dev: {_short_sha(source['origin']['dev'])}",
f"- gitea_main: {_short_sha(source['gitea_ssh']['main'])}",
f"- gitea_dev: {_short_sha(source['gitea_ssh']['dev'])}",
f"- tracked_files_committed: {str(summary['tracked_files_committed']).lower()}",
f"- production_health: {runtime['health']['status']} {runtime['health']['database']} {runtime['health']['version']}",
f"- version_bump_detected: {str(summary['version_bump_detected']).lower()}",
f"- deployment_files_hashed: {report['deployment']['tracked_file_count']}",
f"- container: {container.get('name') or 'not_requested'} {container.get('status')}",
f"- truth_layers_separated: {str(summary['truth_layers_separated']).lower()}",
]
for error in report["errors"]:
lines.append(f"- blocker: {error}")
return "\n".join(lines)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--source-root", type=Path, default=ROOT)
parser.add_argument("--health-url", default=DEFAULT_HEALTH_URL)
parser.add_argument("--timeout", type=float, default=10.0)
parser.add_argument("--gitea-remote", default=DEFAULT_GITEA_REMOTE)
parser.add_argument("--container-name")
parser.add_argument("--tracked-file", action="append", dest="tracked_files")
parser.add_argument("--json", action="store_true", help="Print machine-readable report")
args = parser.parse_args(argv)
tracked_files = tuple(args.tracked_files) if args.tracked_files else DEFAULT_TRACKED_FILES
try:
report = build_report(
root=args.source_root.resolve(),
health_url=args.health_url,
timeout=args.timeout,
gitea_remote=args.gitea_remote,
tracked_files=tracked_files,
container_name=args.container_name,
)
except (
OSError,
ValueError,
json.JSONDecodeError,
subprocess.CalledProcessError,
urllib.error.URLError,
) as exc:
error_report = {
"policy": "p4_source_deployment_runtime_truth_v1",
"result": "BLOCKED",
"errors": [str(exc)],
}
if args.json:
print(json.dumps(error_report, ensure_ascii=False, indent=2))
else:
print("source_deploy_runtime_truth:\n- result: BLOCKED\n- blocker: " + str(exc))
return 2
if args.json:
print(json.dumps(report, ensure_ascii=False, indent=2))
else:
print(format_text(report))
return 0 if report["result"] == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())