481 lines
18 KiB
Python
481 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Machine-readable P4 source, deployment, and runtime truth report.
|
|
|
|
This report deliberately keeps source-control success, deployment file
|
|
readback, and production runtime health as separate evidence layers. It is
|
|
read-only: no database writes, no container lifecycle actions, no secret reads.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import subprocess
|
|
import urllib.error
|
|
import urllib.request
|
|
from collections.abc import Callable, Iterable
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from scripts.ops.check_production_version_truth import parse_config_version
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
DEFAULT_HEALTH_URL = "https://mo.wooo.work/health"
|
|
DEFAULT_GITEA_REMOTE = "ssh://git@192.168.0.110:2222/wooo/ewoooc.git"
|
|
DEFAULT_TRACKED_FILES = (
|
|
"config.py",
|
|
"scripts/ops/check_production_version_truth.py",
|
|
"scripts/ops/report_source_deploy_runtime_truth.py",
|
|
"docs/guides/pchome_ai_automation_priority_backlog.md",
|
|
"docs/AI_INTELLIGENCE_MODULE_SOT.md",
|
|
)
|
|
|
|
CommandRunner = Callable[[list[str], Path], str]
|
|
HealthFetcher = Callable[[str, float], dict[str, Any]]
|
|
|
|
|
|
def run_command(args: list[str], cwd: Path) -> str:
|
|
result = subprocess.run(
|
|
args,
|
|
cwd=cwd,
|
|
check=True,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
return result.stdout.strip()
|
|
|
|
|
|
def fetch_json(url: str, timeout: float) -> dict[str, Any]:
|
|
with urllib.request.urlopen(url, timeout=timeout) as response:
|
|
payload = response.read().decode("utf-8")
|
|
data = json.loads(payload)
|
|
if not isinstance(data, dict):
|
|
raise ValueError("runtime health payload must be a JSON object")
|
|
return data
|
|
|
|
|
|
def _run_git(args: list[str], root: Path, runner: CommandRunner) -> str:
|
|
return runner(["git", *args], root)
|
|
|
|
|
|
def parse_ls_remote(output: str) -> dict[str, str]:
|
|
refs: dict[str, str] = {}
|
|
for line in output.splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
refs[parts[1]] = parts[0]
|
|
return refs
|
|
|
|
|
|
def read_working_tree_config_version(root: Path) -> str:
|
|
return parse_config_version((root / "config.py").read_text(encoding="utf-8"))
|
|
|
|
|
|
def read_head_config_version(root: Path, runner: CommandRunner) -> str:
|
|
return parse_config_version(_run_git(["show", "HEAD:config.py"], root, runner))
|
|
|
|
|
|
def collect_source_control(
|
|
root: Path,
|
|
gitea_remote: str,
|
|
tracked_files: Iterable[str],
|
|
source_override: dict[str, Any] | None = None,
|
|
runner: CommandRunner = run_command,
|
|
) -> dict[str, Any]:
|
|
if source_override:
|
|
working_tree_version = source_override.get("working_tree_config_version") or read_working_tree_config_version(root)
|
|
head_config_version = source_override.get("head_config_version") or working_tree_version
|
|
tracked_file_status = [] if source_override.get("tracked_files_committed") else [
|
|
"override: tracked deployment files not confirmed committed"
|
|
]
|
|
return {
|
|
"truth_source": "Gitea",
|
|
"source_mode": "override_for_no_git_deployment_tree",
|
|
"local": {
|
|
"branch": source_override.get("branch") or "main",
|
|
"head": source_override["head"],
|
|
"working_tree_config_version": working_tree_version,
|
|
"head_config_version": head_config_version,
|
|
"tracked_file_status": tracked_file_status,
|
|
},
|
|
"origin": {
|
|
"remote": source_override.get("origin_remote") or "https://gitea.wooo.work/wooo/ewoooc.git",
|
|
"main": source_override["origin_main"],
|
|
"dev": source_override["origin_dev"],
|
|
},
|
|
"gitea_ssh": {
|
|
"remote": gitea_remote,
|
|
"main": source_override["gitea_main"],
|
|
"dev": source_override["gitea_dev"],
|
|
},
|
|
}
|
|
|
|
origin_refs = parse_ls_remote(
|
|
_run_git(["ls-remote", "origin", "refs/heads/main", "refs/heads/dev"], root, runner)
|
|
)
|
|
gitea_refs = parse_ls_remote(
|
|
_run_git(["ls-remote", gitea_remote, "refs/heads/main", "refs/heads/dev"], root, runner)
|
|
)
|
|
local_head = _run_git(["rev-parse", "HEAD"], root, runner)
|
|
|
|
return {
|
|
"truth_source": "Gitea",
|
|
"local": {
|
|
"branch": _run_git(["rev-parse", "--abbrev-ref", "HEAD"], root, runner),
|
|
"head": local_head,
|
|
"working_tree_config_version": read_working_tree_config_version(root),
|
|
"head_config_version": read_head_config_version(root, runner),
|
|
"tracked_file_status": _run_git(
|
|
["status", "--porcelain", "--", *tracked_files],
|
|
root,
|
|
runner,
|
|
).splitlines(),
|
|
},
|
|
"origin": {
|
|
"remote": _run_git(["remote", "get-url", "origin"], root, runner),
|
|
"main": origin_refs.get("refs/heads/main"),
|
|
"dev": origin_refs.get("refs/heads/dev"),
|
|
},
|
|
"gitea_ssh": {
|
|
"remote": gitea_remote,
|
|
"main": gitea_refs.get("refs/heads/main"),
|
|
"dev": gitea_refs.get("refs/heads/dev"),
|
|
},
|
|
}
|
|
|
|
|
|
def sha256_file(path: Path) -> str:
|
|
digest = hashlib.sha256()
|
|
with path.open("rb") as handle:
|
|
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def collect_deployment_files(root: Path, tracked_files: Iterable[str]) -> dict[str, Any]:
|
|
files: list[dict[str, Any]] = []
|
|
for relpath in tracked_files:
|
|
path = root / relpath
|
|
exists = path.is_file()
|
|
files.append(
|
|
{
|
|
"path": relpath,
|
|
"exists": exists,
|
|
"sha256": sha256_file(path) if exists else None,
|
|
"size_bytes": path.stat().st_size if exists else None,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"truth_source": "deployed_file_hash_readback",
|
|
"source_root": str(root),
|
|
"tracked_file_count": len(files),
|
|
"files": files,
|
|
}
|
|
|
|
|
|
def collect_container_state(
|
|
container_name: str | None,
|
|
root: Path,
|
|
runner: CommandRunner = run_command,
|
|
) -> dict[str, Any]:
|
|
if not container_name:
|
|
return {"requested": False, "name": None, "status": "skipped"}
|
|
|
|
raw_state = runner(["docker", "inspect", "--format", "{{json .State}}", container_name], root)
|
|
state = json.loads(raw_state)
|
|
health = state.get("Health") or {}
|
|
return {
|
|
"requested": True,
|
|
"name": container_name,
|
|
"status": state.get("Status"),
|
|
"running": bool(state.get("Running")),
|
|
"health_status": health.get("Status"),
|
|
}
|
|
|
|
|
|
def collect_runtime(
|
|
health_url: str,
|
|
timeout: float,
|
|
root: Path,
|
|
container_name: str | None = None,
|
|
runner: CommandRunner = run_command,
|
|
health_fetcher: HealthFetcher = fetch_json,
|
|
) -> dict[str, Any]:
|
|
health = health_fetcher(health_url, timeout)
|
|
return {
|
|
"truth_source": "production_runtime_readback",
|
|
"health_url": health_url,
|
|
"health": {
|
|
"status": health.get("status"),
|
|
"database": health.get("database"),
|
|
"version": health.get("version"),
|
|
},
|
|
"container": collect_container_state(container_name, root, runner),
|
|
}
|
|
|
|
|
|
def safety_gates() -> dict[str, Any]:
|
|
return {
|
|
"github_freeze_enforced": True,
|
|
"github_allowed_actions": 0,
|
|
"momo_db_protected": True,
|
|
"remove_orphans_forbidden": True,
|
|
"version_bump_forbidden_in_this_lane": True,
|
|
"secret_read_performed": False,
|
|
"database_write_performed": False,
|
|
"destructive_container_action_performed": False,
|
|
}
|
|
|
|
|
|
def summarize(report: dict[str, Any]) -> dict[str, Any]:
|
|
source = report["source_control"]
|
|
local_head = source["local"]["head"]
|
|
source_refs = [
|
|
source["origin"]["main"],
|
|
source["origin"]["dev"],
|
|
source["gitea_ssh"]["main"],
|
|
source["gitea_ssh"]["dev"],
|
|
]
|
|
source_control_ok = all(ref == local_head for ref in source_refs)
|
|
tracked_files_committed = not source["local"]["tracked_file_status"]
|
|
|
|
runtime = report["runtime"]
|
|
production_version = runtime["health"]["version"]
|
|
head_version = source["local"]["head_config_version"]
|
|
working_tree_version = source["local"]["working_tree_config_version"]
|
|
production_health_ok = runtime["health"]["status"] == "healthy"
|
|
production_version_matches_head = production_version == head_version
|
|
version_bump_detected = working_tree_version != head_version or head_version != production_version
|
|
|
|
deployment = report["deployment"]
|
|
deployment_hash_readback_ok = all(file["exists"] and file["sha256"] for file in deployment["files"])
|
|
|
|
container = runtime["container"]
|
|
container_readback_ok = not container["requested"] or (
|
|
container.get("running") is True
|
|
and container.get("status") == "running"
|
|
and container.get("health_status") in {"healthy", None}
|
|
)
|
|
|
|
gates = report["safety_gates"]
|
|
safety_ok = (
|
|
gates["github_freeze_enforced"]
|
|
and gates["github_allowed_actions"] == 0
|
|
and gates["momo_db_protected"]
|
|
and gates["remove_orphans_forbidden"]
|
|
and not gates["secret_read_performed"]
|
|
and not gates["database_write_performed"]
|
|
and not gates["destructive_container_action_performed"]
|
|
)
|
|
|
|
return {
|
|
"source_control_ok": source_control_ok,
|
|
"tracked_files_committed": tracked_files_committed,
|
|
"deployment_hash_readback_ok": deployment_hash_readback_ok,
|
|
"production_health_ok": production_health_ok,
|
|
"production_version_matches_head": production_version_matches_head,
|
|
"version_bump_detected": version_bump_detected,
|
|
"container_readback_ok": container_readback_ok,
|
|
"github_freeze_enforced": gates["github_freeze_enforced"],
|
|
"momo_db_protected": gates["momo_db_protected"],
|
|
"truth_layers_separated": True,
|
|
"success": all(
|
|
[
|
|
source_control_ok,
|
|
tracked_files_committed,
|
|
deployment_hash_readback_ok,
|
|
production_health_ok,
|
|
production_version_matches_head,
|
|
not version_bump_detected,
|
|
container_readback_ok,
|
|
safety_ok,
|
|
]
|
|
),
|
|
}
|
|
|
|
|
|
def evaluate(report: dict[str, Any]) -> tuple[bool, list[str]]:
|
|
summary = report["summary"]
|
|
errors: list[str] = []
|
|
if not summary["source_control_ok"]:
|
|
errors.append("local HEAD, origin main/dev, and Gitea SSH main/dev are not aligned")
|
|
if not summary["tracked_files_committed"]:
|
|
errors.append("tracked deployment files have uncommitted source-control changes")
|
|
if not summary["deployment_hash_readback_ok"]:
|
|
errors.append("one or more tracked deployment files are missing or lack hash readback")
|
|
if not summary["production_health_ok"]:
|
|
errors.append("production runtime health is not healthy")
|
|
if not summary["production_version_matches_head"]:
|
|
errors.append("production /health version does not match HEAD config.py")
|
|
if summary["version_bump_detected"]:
|
|
errors.append("unexpected version drift or bump detected")
|
|
if not summary["container_readback_ok"]:
|
|
errors.append("container readback was requested but did not return running/healthy")
|
|
if not summary["github_freeze_enforced"]:
|
|
errors.append("GitHub freeze is not enforced")
|
|
if not summary["momo_db_protected"]:
|
|
errors.append("momo-db protection gate is not set")
|
|
return not errors, errors
|
|
|
|
|
|
def build_report(
|
|
*,
|
|
root: Path = ROOT,
|
|
health_url: str = DEFAULT_HEALTH_URL,
|
|
timeout: float = 10.0,
|
|
gitea_remote: str = DEFAULT_GITEA_REMOTE,
|
|
tracked_files: Iterable[str] = DEFAULT_TRACKED_FILES,
|
|
container_name: str | None = None,
|
|
source_override: dict[str, Any] | None = None,
|
|
runner: CommandRunner = run_command,
|
|
health_fetcher: HealthFetcher = fetch_json,
|
|
) -> dict[str, Any]:
|
|
tracked_file_tuple = tuple(tracked_files)
|
|
report = {
|
|
"policy": "p4_source_deployment_runtime_truth_v1",
|
|
"source_control": collect_source_control(
|
|
root,
|
|
gitea_remote,
|
|
tracked_file_tuple,
|
|
source_override,
|
|
runner,
|
|
),
|
|
"deployment": collect_deployment_files(root, tracked_file_tuple),
|
|
"runtime": collect_runtime(health_url, timeout, root, container_name, runner, health_fetcher),
|
|
"safety_gates": safety_gates(),
|
|
}
|
|
report["summary"] = summarize(report)
|
|
ok, errors = evaluate(report)
|
|
report["result"] = "PASS" if ok else "BLOCKED"
|
|
report["errors"] = errors
|
|
return report
|
|
|
|
|
|
def _short_sha(value: str | None) -> str:
|
|
return value[:12] if value else "missing"
|
|
|
|
|
|
def format_text(report: dict[str, Any]) -> str:
|
|
source = report["source_control"]
|
|
runtime = report["runtime"]
|
|
summary = report["summary"]
|
|
container = runtime["container"]
|
|
|
|
lines = [
|
|
"source_deploy_runtime_truth:",
|
|
f"- policy: {report['policy']}",
|
|
f"- result: {report['result']}",
|
|
f"- local_branch: {source['local']['branch']}",
|
|
f"- local_head: {_short_sha(source['local']['head'])}",
|
|
f"- origin_main: {_short_sha(source['origin']['main'])}",
|
|
f"- origin_dev: {_short_sha(source['origin']['dev'])}",
|
|
f"- gitea_main: {_short_sha(source['gitea_ssh']['main'])}",
|
|
f"- gitea_dev: {_short_sha(source['gitea_ssh']['dev'])}",
|
|
f"- tracked_files_committed: {str(summary['tracked_files_committed']).lower()}",
|
|
f"- production_health: {runtime['health']['status']} {runtime['health']['database']} {runtime['health']['version']}",
|
|
f"- version_bump_detected: {str(summary['version_bump_detected']).lower()}",
|
|
f"- deployment_files_hashed: {report['deployment']['tracked_file_count']}",
|
|
f"- container: {container.get('name') or 'not_requested'} {container.get('status')}",
|
|
f"- truth_layers_separated: {str(summary['truth_layers_separated']).lower()}",
|
|
]
|
|
for error in report["errors"]:
|
|
lines.append(f"- blocker: {error}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--source-root", type=Path, default=ROOT)
|
|
parser.add_argument("--health-url", default=DEFAULT_HEALTH_URL)
|
|
parser.add_argument("--timeout", type=float, default=10.0)
|
|
parser.add_argument("--gitea-remote", default=DEFAULT_GITEA_REMOTE)
|
|
parser.add_argument("--container-name")
|
|
parser.add_argument("--tracked-file", action="append", dest="tracked_files")
|
|
parser.add_argument("--source-head", help="Use explicit source HEAD when deployment tree has no .git")
|
|
parser.add_argument("--source-branch", default="main")
|
|
parser.add_argument("--origin-main")
|
|
parser.add_argument("--origin-dev")
|
|
parser.add_argument("--gitea-main")
|
|
parser.add_argument("--gitea-dev")
|
|
parser.add_argument("--origin-remote", default="https://gitea.wooo.work/wooo/ewoooc.git")
|
|
parser.add_argument("--head-config-version")
|
|
parser.add_argument("--working-tree-config-version")
|
|
parser.add_argument(
|
|
"--tracked-files-committed",
|
|
action="store_true",
|
|
help="Confirm the explicit source HEAD already contains the tracked deployment files.",
|
|
)
|
|
parser.add_argument("--json", action="store_true", help="Print machine-readable report")
|
|
args = parser.parse_args(argv)
|
|
|
|
tracked_files = tuple(args.tracked_files) if args.tracked_files else DEFAULT_TRACKED_FILES
|
|
source_override = None
|
|
if args.source_head:
|
|
required_overrides = {
|
|
"origin_main": args.origin_main,
|
|
"origin_dev": args.origin_dev,
|
|
"gitea_main": args.gitea_main,
|
|
"gitea_dev": args.gitea_dev,
|
|
}
|
|
missing = [name for name, value in required_overrides.items() if not value]
|
|
if missing:
|
|
print(
|
|
"source_deploy_runtime_truth:\n- result: BLOCKED\n- blocker: missing source override fields: "
|
|
+ ", ".join(missing)
|
|
)
|
|
return 2
|
|
source_override = {
|
|
"branch": args.source_branch,
|
|
"head": args.source_head,
|
|
"origin_main": args.origin_main,
|
|
"origin_dev": args.origin_dev,
|
|
"gitea_main": args.gitea_main,
|
|
"gitea_dev": args.gitea_dev,
|
|
"origin_remote": args.origin_remote,
|
|
"head_config_version": args.head_config_version,
|
|
"working_tree_config_version": args.working_tree_config_version,
|
|
"tracked_files_committed": args.tracked_files_committed,
|
|
}
|
|
|
|
try:
|
|
report = build_report(
|
|
root=args.source_root.resolve(),
|
|
health_url=args.health_url,
|
|
timeout=args.timeout,
|
|
gitea_remote=args.gitea_remote,
|
|
tracked_files=tracked_files,
|
|
container_name=args.container_name,
|
|
source_override=source_override,
|
|
)
|
|
except (
|
|
OSError,
|
|
ValueError,
|
|
json.JSONDecodeError,
|
|
subprocess.CalledProcessError,
|
|
urllib.error.URLError,
|
|
) as exc:
|
|
error_report = {
|
|
"policy": "p4_source_deployment_runtime_truth_v1",
|
|
"result": "BLOCKED",
|
|
"errors": [str(exc)],
|
|
}
|
|
if args.json:
|
|
print(json.dumps(error_report, ensure_ascii=False, indent=2))
|
|
else:
|
|
print("source_deploy_runtime_truth:\n- result: BLOCKED\n- blocker: " + str(exc))
|
|
return 2
|
|
|
|
if args.json:
|
|
print(json.dumps(report, ensure_ascii=False, indent=2))
|
|
else:
|
|
print(format_text(report))
|
|
return 0 if report["result"] == "PASS" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|