Files
ewoooc/scripts/ops/report_source_deploy_runtime_truth.py
ogt c452ebd1c1
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
修正 P4 report 直接執行路徑
2026-07-02 15:08:46 +08:00

485 lines
18 KiB
Python

#!/usr/bin/env python3
"""Machine-readable P4 source, deployment, and runtime truth report.
This report deliberately keeps source-control success, deployment file
readback, and production runtime health as separate evidence layers. It is
read-only: no database writes, no container lifecycle actions, no secret reads.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import subprocess
import sys
import urllib.error
import urllib.request
from collections.abc import Callable, Iterable
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from scripts.ops.check_production_version_truth import parse_config_version
DEFAULT_HEALTH_URL = "https://mo.wooo.work/health"
DEFAULT_GITEA_REMOTE = "ssh://git@192.168.0.110:2222/wooo/ewoooc.git"
DEFAULT_TRACKED_FILES = (
"config.py",
"scripts/ops/check_production_version_truth.py",
"scripts/ops/report_source_deploy_runtime_truth.py",
"docs/guides/pchome_ai_automation_priority_backlog.md",
"docs/AI_INTELLIGENCE_MODULE_SOT.md",
)
CommandRunner = Callable[[list[str], Path], str]
HealthFetcher = Callable[[str, float], dict[str, Any]]
def run_command(args: list[str], cwd: Path) -> str:
result = subprocess.run(
args,
cwd=cwd,
check=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return result.stdout.strip()
def fetch_json(url: str, timeout: float) -> dict[str, Any]:
with urllib.request.urlopen(url, timeout=timeout) as response:
payload = response.read().decode("utf-8")
data = json.loads(payload)
if not isinstance(data, dict):
raise ValueError("runtime health payload must be a JSON object")
return data
def _run_git(args: list[str], root: Path, runner: CommandRunner) -> str:
return runner(["git", *args], root)
def parse_ls_remote(output: str) -> dict[str, str]:
refs: dict[str, str] = {}
for line in output.splitlines():
parts = line.split()
if len(parts) >= 2:
refs[parts[1]] = parts[0]
return refs
def read_working_tree_config_version(root: Path) -> str:
return parse_config_version((root / "config.py").read_text(encoding="utf-8"))
def read_head_config_version(root: Path, runner: CommandRunner) -> str:
return parse_config_version(_run_git(["show", "HEAD:config.py"], root, runner))
def collect_source_control(
root: Path,
gitea_remote: str,
tracked_files: Iterable[str],
source_override: dict[str, Any] | None = None,
runner: CommandRunner = run_command,
) -> dict[str, Any]:
if source_override:
working_tree_version = source_override.get("working_tree_config_version") or read_working_tree_config_version(root)
head_config_version = source_override.get("head_config_version") or working_tree_version
tracked_file_status = [] if source_override.get("tracked_files_committed") else [
"override: tracked deployment files not confirmed committed"
]
return {
"truth_source": "Gitea",
"source_mode": "override_for_no_git_deployment_tree",
"local": {
"branch": source_override.get("branch") or "main",
"head": source_override["head"],
"working_tree_config_version": working_tree_version,
"head_config_version": head_config_version,
"tracked_file_status": tracked_file_status,
},
"origin": {
"remote": source_override.get("origin_remote") or "https://gitea.wooo.work/wooo/ewoooc.git",
"main": source_override["origin_main"],
"dev": source_override["origin_dev"],
},
"gitea_ssh": {
"remote": gitea_remote,
"main": source_override["gitea_main"],
"dev": source_override["gitea_dev"],
},
}
origin_refs = parse_ls_remote(
_run_git(["ls-remote", "origin", "refs/heads/main", "refs/heads/dev"], root, runner)
)
gitea_refs = parse_ls_remote(
_run_git(["ls-remote", gitea_remote, "refs/heads/main", "refs/heads/dev"], root, runner)
)
local_head = _run_git(["rev-parse", "HEAD"], root, runner)
return {
"truth_source": "Gitea",
"local": {
"branch": _run_git(["rev-parse", "--abbrev-ref", "HEAD"], root, runner),
"head": local_head,
"working_tree_config_version": read_working_tree_config_version(root),
"head_config_version": read_head_config_version(root, runner),
"tracked_file_status": _run_git(
["status", "--porcelain", "--", *tracked_files],
root,
runner,
).splitlines(),
},
"origin": {
"remote": _run_git(["remote", "get-url", "origin"], root, runner),
"main": origin_refs.get("refs/heads/main"),
"dev": origin_refs.get("refs/heads/dev"),
},
"gitea_ssh": {
"remote": gitea_remote,
"main": gitea_refs.get("refs/heads/main"),
"dev": gitea_refs.get("refs/heads/dev"),
},
}
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def collect_deployment_files(root: Path, tracked_files: Iterable[str]) -> dict[str, Any]:
files: list[dict[str, Any]] = []
for relpath in tracked_files:
path = root / relpath
exists = path.is_file()
files.append(
{
"path": relpath,
"exists": exists,
"sha256": sha256_file(path) if exists else None,
"size_bytes": path.stat().st_size if exists else None,
}
)
return {
"truth_source": "deployed_file_hash_readback",
"source_root": str(root),
"tracked_file_count": len(files),
"files": files,
}
def collect_container_state(
container_name: str | None,
root: Path,
runner: CommandRunner = run_command,
) -> dict[str, Any]:
if not container_name:
return {"requested": False, "name": None, "status": "skipped"}
raw_state = runner(["docker", "inspect", "--format", "{{json .State}}", container_name], root)
state = json.loads(raw_state)
health = state.get("Health") or {}
return {
"requested": True,
"name": container_name,
"status": state.get("Status"),
"running": bool(state.get("Running")),
"health_status": health.get("Status"),
}
def collect_runtime(
health_url: str,
timeout: float,
root: Path,
container_name: str | None = None,
runner: CommandRunner = run_command,
health_fetcher: HealthFetcher = fetch_json,
) -> dict[str, Any]:
health = health_fetcher(health_url, timeout)
return {
"truth_source": "production_runtime_readback",
"health_url": health_url,
"health": {
"status": health.get("status"),
"database": health.get("database"),
"version": health.get("version"),
},
"container": collect_container_state(container_name, root, runner),
}
def safety_gates() -> dict[str, Any]:
return {
"github_freeze_enforced": True,
"github_allowed_actions": 0,
"momo_db_protected": True,
"remove_orphans_forbidden": True,
"version_bump_forbidden_in_this_lane": True,
"secret_read_performed": False,
"database_write_performed": False,
"destructive_container_action_performed": False,
}
def summarize(report: dict[str, Any]) -> dict[str, Any]:
source = report["source_control"]
local_head = source["local"]["head"]
source_refs = [
source["origin"]["main"],
source["origin"]["dev"],
source["gitea_ssh"]["main"],
source["gitea_ssh"]["dev"],
]
source_control_ok = all(ref == local_head for ref in source_refs)
tracked_files_committed = not source["local"]["tracked_file_status"]
runtime = report["runtime"]
production_version = runtime["health"]["version"]
head_version = source["local"]["head_config_version"]
working_tree_version = source["local"]["working_tree_config_version"]
production_health_ok = runtime["health"]["status"] == "healthy"
production_version_matches_head = production_version == head_version
version_bump_detected = working_tree_version != head_version or head_version != production_version
deployment = report["deployment"]
deployment_hash_readback_ok = all(file["exists"] and file["sha256"] for file in deployment["files"])
container = runtime["container"]
container_readback_ok = not container["requested"] or (
container.get("running") is True
and container.get("status") == "running"
and container.get("health_status") in {"healthy", None}
)
gates = report["safety_gates"]
safety_ok = (
gates["github_freeze_enforced"]
and gates["github_allowed_actions"] == 0
and gates["momo_db_protected"]
and gates["remove_orphans_forbidden"]
and not gates["secret_read_performed"]
and not gates["database_write_performed"]
and not gates["destructive_container_action_performed"]
)
return {
"source_control_ok": source_control_ok,
"tracked_files_committed": tracked_files_committed,
"deployment_hash_readback_ok": deployment_hash_readback_ok,
"production_health_ok": production_health_ok,
"production_version_matches_head": production_version_matches_head,
"version_bump_detected": version_bump_detected,
"container_readback_ok": container_readback_ok,
"github_freeze_enforced": gates["github_freeze_enforced"],
"momo_db_protected": gates["momo_db_protected"],
"truth_layers_separated": True,
"success": all(
[
source_control_ok,
tracked_files_committed,
deployment_hash_readback_ok,
production_health_ok,
production_version_matches_head,
not version_bump_detected,
container_readback_ok,
safety_ok,
]
),
}
def evaluate(report: dict[str, Any]) -> tuple[bool, list[str]]:
summary = report["summary"]
errors: list[str] = []
if not summary["source_control_ok"]:
errors.append("local HEAD, origin main/dev, and Gitea SSH main/dev are not aligned")
if not summary["tracked_files_committed"]:
errors.append("tracked deployment files have uncommitted source-control changes")
if not summary["deployment_hash_readback_ok"]:
errors.append("one or more tracked deployment files are missing or lack hash readback")
if not summary["production_health_ok"]:
errors.append("production runtime health is not healthy")
if not summary["production_version_matches_head"]:
errors.append("production /health version does not match HEAD config.py")
if summary["version_bump_detected"]:
errors.append("unexpected version drift or bump detected")
if not summary["container_readback_ok"]:
errors.append("container readback was requested but did not return running/healthy")
if not summary["github_freeze_enforced"]:
errors.append("GitHub freeze is not enforced")
if not summary["momo_db_protected"]:
errors.append("momo-db protection gate is not set")
return not errors, errors
def build_report(
*,
root: Path = ROOT,
health_url: str = DEFAULT_HEALTH_URL,
timeout: float = 10.0,
gitea_remote: str = DEFAULT_GITEA_REMOTE,
tracked_files: Iterable[str] = DEFAULT_TRACKED_FILES,
container_name: str | None = None,
source_override: dict[str, Any] | None = None,
runner: CommandRunner = run_command,
health_fetcher: HealthFetcher = fetch_json,
) -> dict[str, Any]:
tracked_file_tuple = tuple(tracked_files)
report = {
"policy": "p4_source_deployment_runtime_truth_v1",
"source_control": collect_source_control(
root,
gitea_remote,
tracked_file_tuple,
source_override,
runner,
),
"deployment": collect_deployment_files(root, tracked_file_tuple),
"runtime": collect_runtime(health_url, timeout, root, container_name, runner, health_fetcher),
"safety_gates": safety_gates(),
}
report["summary"] = summarize(report)
ok, errors = evaluate(report)
report["result"] = "PASS" if ok else "BLOCKED"
report["errors"] = errors
return report
def _short_sha(value: str | None) -> str:
return value[:12] if value else "missing"
def format_text(report: dict[str, Any]) -> str:
source = report["source_control"]
runtime = report["runtime"]
summary = report["summary"]
container = runtime["container"]
lines = [
"source_deploy_runtime_truth:",
f"- policy: {report['policy']}",
f"- result: {report['result']}",
f"- local_branch: {source['local']['branch']}",
f"- local_head: {_short_sha(source['local']['head'])}",
f"- origin_main: {_short_sha(source['origin']['main'])}",
f"- origin_dev: {_short_sha(source['origin']['dev'])}",
f"- gitea_main: {_short_sha(source['gitea_ssh']['main'])}",
f"- gitea_dev: {_short_sha(source['gitea_ssh']['dev'])}",
f"- tracked_files_committed: {str(summary['tracked_files_committed']).lower()}",
f"- production_health: {runtime['health']['status']} {runtime['health']['database']} {runtime['health']['version']}",
f"- version_bump_detected: {str(summary['version_bump_detected']).lower()}",
f"- deployment_files_hashed: {report['deployment']['tracked_file_count']}",
f"- container: {container.get('name') or 'not_requested'} {container.get('status')}",
f"- truth_layers_separated: {str(summary['truth_layers_separated']).lower()}",
]
for error in report["errors"]:
lines.append(f"- blocker: {error}")
return "\n".join(lines)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--source-root", type=Path, default=ROOT)
parser.add_argument("--health-url", default=DEFAULT_HEALTH_URL)
parser.add_argument("--timeout", type=float, default=10.0)
parser.add_argument("--gitea-remote", default=DEFAULT_GITEA_REMOTE)
parser.add_argument("--container-name")
parser.add_argument("--tracked-file", action="append", dest="tracked_files")
parser.add_argument("--source-head", help="Use explicit source HEAD when deployment tree has no .git")
parser.add_argument("--source-branch", default="main")
parser.add_argument("--origin-main")
parser.add_argument("--origin-dev")
parser.add_argument("--gitea-main")
parser.add_argument("--gitea-dev")
parser.add_argument("--origin-remote", default="https://gitea.wooo.work/wooo/ewoooc.git")
parser.add_argument("--head-config-version")
parser.add_argument("--working-tree-config-version")
parser.add_argument(
"--tracked-files-committed",
action="store_true",
help="Confirm the explicit source HEAD already contains the tracked deployment files.",
)
parser.add_argument("--json", action="store_true", help="Print machine-readable report")
args = parser.parse_args(argv)
tracked_files = tuple(args.tracked_files) if args.tracked_files else DEFAULT_TRACKED_FILES
source_override = None
if args.source_head:
required_overrides = {
"origin_main": args.origin_main,
"origin_dev": args.origin_dev,
"gitea_main": args.gitea_main,
"gitea_dev": args.gitea_dev,
}
missing = [name for name, value in required_overrides.items() if not value]
if missing:
print(
"source_deploy_runtime_truth:\n- result: BLOCKED\n- blocker: missing source override fields: "
+ ", ".join(missing)
)
return 2
source_override = {
"branch": args.source_branch,
"head": args.source_head,
"origin_main": args.origin_main,
"origin_dev": args.origin_dev,
"gitea_main": args.gitea_main,
"gitea_dev": args.gitea_dev,
"origin_remote": args.origin_remote,
"head_config_version": args.head_config_version,
"working_tree_config_version": args.working_tree_config_version,
"tracked_files_committed": args.tracked_files_committed,
}
try:
report = build_report(
root=args.source_root.resolve(),
health_url=args.health_url,
timeout=args.timeout,
gitea_remote=args.gitea_remote,
tracked_files=tracked_files,
container_name=args.container_name,
source_override=source_override,
)
except (
OSError,
ValueError,
json.JSONDecodeError,
subprocess.CalledProcessError,
urllib.error.URLError,
) as exc:
error_report = {
"policy": "p4_source_deployment_runtime_truth_v1",
"result": "BLOCKED",
"errors": [str(exc)],
}
if args.json:
print(json.dumps(error_report, ensure_ascii=False, indent=2))
else:
print("source_deploy_runtime_truth:\n- result: BLOCKED\n- blocker: " + str(exc))
return 2
if args.json:
print(json.dumps(report, ensure_ascii=False, indent=2))
else:
print(format_text(report))
return 0 if report["result"] == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())