補齊 PChome retry executor receipt replay
Some checks failed
CD Pipeline / deploy (push) Has been cancelled

This commit is contained in:
ogt
2026-07-02 00:51:11 +08:00
parent 034ac63959
commit 2f53e56460
3 changed files with 417 additions and 0 deletions

View File

@@ -71,6 +71,9 @@ DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_PREFLIGHT_POLICY = (
DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_EXECUTOR_POLICY = (
"ai_controlled_pchome_growth_direct_mapping_retry_candidate_exception_controlled_apply_executor"
)
DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_POLICY = (
"ai_controlled_pchome_growth_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay"
)
AI_AUTOMATION_READINESS_POLICY = "read_only_pchome_growth_ai_automation_readiness"
EVIDENCE_ENRICHMENT_PREVIEW_POLICY = "read_only_pchome_growth_evidence_enrichment_preview"
EVIDENCE_SOURCE_PREVIEW_POLICY = "read_only_pchome_growth_evidence_source_preview"
@@ -4237,6 +4240,272 @@ def build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_execu
}
def _find_retry_exception_artifact_file(root: Path, subdir: str, run_id: str | None = None) -> Path | None:
artifact_dir = root / "artifacts" / "pchome_growth" / "retry_exception_closeout" / subdir
if run_id:
candidate = artifact_dir / f"{run_id}.json"
return candidate if candidate.exists() else None
candidates = sorted(
artifact_dir.glob("*.json"),
key=lambda path: path.stat().st_mtime,
reverse=True,
)
return candidates[0] if candidates else None
def _load_retry_exception_json_artifact(path: Path | None) -> dict[str, Any]:
if path is None or not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {}
return payload if isinstance(payload, dict) else {}
def _retry_exception_controlled_apply_receipt_replay_id(
run_id: str,
post_apply_readbacks: list[dict[str, Any]],
) -> str:
payload = {
"run_id": run_id,
"post_apply_readbacks": post_apply_readbacks,
}
digest = hashlib.sha256(
json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8")
).hexdigest()[:16]
return f"pchome-retry-exception-controlled-apply-receipt-replay-{digest}"
def build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package(
*,
artifact_root: str | Path | None = None,
run_id: str | None = None,
materialize_artifacts: bool = False,
engine: Any = None,
) -> dict[str, Any]:
"""Rebuild a machine-verifiable executor closeout receipt from artifacts and DB readback."""
root = Path(artifact_root) if artifact_root is not None else Path.cwd() / "data"
verifier_path = _find_retry_exception_artifact_file(root, "verifier_inputs", run_id)
identity_path = _find_retry_exception_artifact_file(root, "identity_readback", run_id)
preflight_path = _find_retry_exception_artifact_file(root, "controlled_apply_preflight", run_id)
verifier_payload = _load_retry_exception_json_artifact(verifier_path)
identity_payload = _load_retry_exception_json_artifact(identity_path)
preflight_payload = _load_retry_exception_json_artifact(preflight_path)
effective_run_id = (
run_id
or verifier_payload.get("run_id")
or identity_payload.get("run_id")
or preflight_payload.get("run_id")
or ""
)
receipts = list(verifier_payload.get("no_write_verifier_receipts") or [])
target_selectors = [
{
"selector_id": receipt.get("receipt_id"),
"source_receipt_id": receipt.get("receipt_id"),
"source_closeout_receipt_id": receipt.get("source_closeout_receipt_id"),
"momo_product_id": (receipt.get("subject") or {}).get("momo_product_id"),
"momo_product_name": (receipt.get("subject") or {}).get("momo_product_name"),
"momo_price": (receipt.get("subject") or {}).get("momo_price"),
"target_pchome_product_id": (receipt.get("subject") or {}).get("target_pchome_product_id"),
"target_pchome_product_name": (receipt.get("subject") or {}).get("pchome_product_name"),
}
for receipt in receipts
if (receipt.get("subject") or {}).get("momo_product_id")
]
missing_artifacts = [
name
for name, payload in (
("verifier_inputs", verifier_payload),
("identity_readback", identity_payload),
("controlled_apply_preflight", preflight_payload),
)
if not payload
]
post_apply_readbacks: list[dict[str, Any]] = []
if engine is not None and target_selectors:
with engine.connect() as conn:
for selector in target_selectors:
momo_icode = str(selector.get("momo_product_id") or "").strip()
expected_pchome_id = str(selector.get("target_pchome_product_id") or "").strip()
row = _fetch_pchome_product_match_by_momo_icode(conn, momo_icode)
actual_pchome_id = str((row or {}).get("pchome_id") or "").strip()
post_apply_readbacks.append({
"selector_id": selector.get("selector_id"),
"momo_icode": momo_icode,
"expected_pchome_id": expected_pchome_id,
"actual_pchome_id": actual_pchome_id,
"expected_momo_name": selector.get("momo_product_name"),
"actual_momo_name": (row or {}).get("momo_name"),
"passed": bool(row) and actual_pchome_id == expected_pchome_id,
"writes_database": False,
})
readback_pass_count = sum(1 for item in post_apply_readbacks if item.get("passed"))
receipt_ready = (
not missing_artifacts
and bool(effective_run_id)
and bool(target_selectors)
and engine is not None
and readback_pass_count == len(target_selectors)
)
if missing_artifacts:
result = "WAITING_FOR_RETRY_EXCEPTION_CONTROLLED_APPLY_REPLAY_ARTIFACTS"
elif engine is None:
result = "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_BLOCKED_ENGINE_REQUIRED"
elif not target_selectors:
result = "WAITING_FOR_RETRY_EXCEPTION_CONTROLLED_APPLY_REPLAY_SELECTORS"
elif receipt_ready:
result = "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED"
else:
result = "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_READBACK_MISMATCH"
receipt_id = _retry_exception_controlled_apply_receipt_replay_id(effective_run_id, post_apply_readbacks)
summary = {
"source_receipt_count": len(receipts),
"target_selector_count": len(target_selectors),
"post_apply_readback_count": len(post_apply_readbacks),
"post_apply_readback_pass_count": readback_pass_count,
"post_apply_readback_fail_count": len(post_apply_readbacks) - readback_pass_count,
"missing_artifact_count": len(missing_artifacts),
"executor_receipt_ready_count": 1 if receipt_ready else 0,
"executor_receipt_materialized_count": 0,
"executor_receipt_hash_match_count": 0,
"writes_database_count": 0,
}
safety = {
"ai_controlled_apply": True,
"receipt_replay": True,
"reads_artifact_files": True,
"reads_database": engine is not None,
"writes_database": False,
"writes_database_count": 0,
"writes_artifact_count": 0,
"syncs_external_offers": False,
"dispatches_telegram": False,
"gemini_allowed": False,
"requires_production_version_truth": True,
}
rollback_plan = {
"rollback_step_count": 1 if receipt_ready else 0,
"rollback_steps": [
{
"action": "delete_materialized_executor_replay_receipt",
"receipt_id": receipt_id,
"executes_in_replay": False,
"writes_database": False,
}
] if receipt_ready else [],
"writes_database": False,
}
receipt_relative_path = (
f"artifacts/pchome_growth/retry_exception_closeout/"
f"controlled_apply_executor_replay/{receipt_id}.json"
)
receipt_payload = {
"artifact_key": "retry_exception_controlled_apply_executor_replay_receipt",
"receipt_id": receipt_id,
"run_id": effective_run_id,
"source_policy": DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_POLICY,
"source_artifacts": {
"verifier_inputs": str(verifier_path.relative_to(root)) if verifier_path else None,
"identity_readback": str(identity_path.relative_to(root)) if identity_path else None,
"controlled_apply_preflight": str(preflight_path.relative_to(root)) if preflight_path else None,
},
"result": result,
"summary": summary,
"identity_readback": {
"momo_product_ids": identity_payload.get("momo_product_ids") or [],
"target_pchome_product_ids": identity_payload.get("target_pchome_product_ids") or [],
"identity_delta_status": identity_payload.get("identity_delta_status"),
},
"target_selectors": target_selectors,
"post_apply_readbacks": post_apply_readbacks,
"rollback_plan": rollback_plan,
"safety": safety,
}
receipt_bytes = _canonical_retry_exception_artifact_bytes(receipt_payload)
receipt_artifact = {
"key": "retry_exception_controlled_apply_executor_replay_receipt",
"artifact_type": "controlled_apply_executor_replay_receipt",
"relative_path": receipt_relative_path,
"payload_sha256": hashlib.sha256(receipt_bytes).hexdigest(),
"byte_count": len(receipt_bytes),
"payload": receipt_payload,
"materialized": False,
"writes_database": False,
}
materialized_artifacts: list[dict[str, Any]] = []
if materialize_artifacts and receipt_ready:
target_path = _resolve_retry_exception_artifact_path(root, receipt_relative_path)
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(receipt_bytes)
materialized_artifacts.append({
"key": receipt_artifact["key"],
"relative_path": receipt_relative_path,
"absolute_path": str(target_path),
"payload_sha256": receipt_artifact["payload_sha256"],
"written_byte_count": target_path.stat().st_size,
"writes_database": False,
})
receipt_artifact["materialized"] = True
receipt_artifact["absolute_path"] = str(target_path)
receipt_path = _resolve_retry_exception_artifact_path(root, receipt_relative_path)
actual_sha = hashlib.sha256(receipt_path.read_bytes()).hexdigest() if receipt_path.exists() else ""
verifier_checks = [
{"check": "source_artifacts_loaded", "passed": not missing_artifacts, "missing_artifacts": missing_artifacts},
{"check": "target_selectors_present", "passed": bool(target_selectors)},
{"check": "post_apply_readbacks_all_passed", "passed": readback_pass_count == len(target_selectors)},
{
"check": "receipt_materialized_when_requested",
"passed": (not materialize_artifacts) or (receipt_ready and receipt_path.exists()),
},
{
"check": "receipt_hash_matches_expected",
"passed": (not materialize_artifacts) or (bool(actual_sha) and actual_sha == receipt_artifact["payload_sha256"]),
},
{"check": "replay_does_not_write_database", "passed": safety["writes_database"] is False},
]
summary["executor_receipt_materialized_count"] = len(materialized_artifacts)
summary["executor_receipt_hash_match_count"] = 1 if actual_sha and actual_sha == receipt_artifact["payload_sha256"] else 0
summary["post_executor_receipt_verifier_check_count"] = len(verifier_checks)
safety["writes_artifact_count"] = len(materialized_artifacts)
return {
"policy": DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_POLICY,
"result": result,
"success": not missing_artifacts,
"summary": summary,
"receipt_replay": {
"receipt_id": receipt_id,
"run_id": effective_run_id,
"stage": "P2_retry_exception_controlled_apply_receipt_replay",
"status": result,
"materialize_artifacts": bool(materialize_artifacts),
"artifact_root": str(root),
"ready": receipt_ready,
},
"target_selectors": target_selectors,
"post_apply_readbacks": post_apply_readbacks,
"executor_receipt_artifact": receipt_artifact,
"materialized_executor_artifacts": materialized_artifacts,
"post_executor_receipt_verifier": {
"checks": verifier_checks,
"check_count": len(verifier_checks),
"passed": all(check.get("passed") is True for check in verifier_checks),
"expected_sha256": receipt_artifact["payload_sha256"],
"actual_sha256": actual_sha,
"hash_match": bool(actual_sha) and actual_sha == receipt_artifact["payload_sha256"],
"writes_database": False,
},
"rollback_plan": rollback_plan,
"source_artifacts": receipt_payload["source_artifacts"],
"missing_artifacts": missing_artifacts,
"safety": safety,
}
def build_pchome_evidence_enrichment_preview(payload: dict[str, Any], batch_size: int = 5) -> dict[str, Any]:
"""Build a read-only evidence enrichment package for mapping targets."""
operator_preview = build_pchome_mapping_operator_preview(payload, batch_size=batch_size)