diff --git a/services/pchome_mapping_backlog_service.py b/services/pchome_mapping_backlog_service.py index a47a864..8dc44fd 100644 --- a/services/pchome_mapping_backlog_service.py +++ b/services/pchome_mapping_backlog_service.py @@ -4067,6 +4067,145 @@ def build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_execu } for selector in selectors ] + summary = { + "controlled_apply_executor_ready_count": 1 if executor_ready else 0, + "execute_apply_requested_count": 1 if execute_apply else 0, + "target_selector_count": len(selectors), + "prewrite_snapshot_count": len(prewrite_snapshots), + "applied_record_count": applied_count, + "post_apply_readback_count": len(post_apply_readbacks), + "post_apply_readback_pass_count": readback_pass_count, + "post_apply_readback_fail_count": len(post_apply_readbacks) - readback_pass_count, + "rollback_step_count": len(rollback_steps), + "missing_table_count": len(missing_tables), + "writes_database_count": applied_count, + "persists_candidate_count": applied_count, + } + executor_metadata = { + "executor_id": executor_id, + "source_preflight_id": preflight.get("preflight_id"), + "run_id": preflight.get("run_id"), + "stage": "P2_retry_exception_controlled_apply_executor", + "status": result, + "execute_apply": bool(execute_apply), + "target_table": "pchome_product_matches", + "ready_for_apply": executor_ready, + "write_attempted": write_attempted, + "missing_tables": missing_tables, + "requires_fresh_production_truth": True, + } + rollback_plan = { + "rollback_step_count": len(rollback_steps), + "rollback_steps": rollback_steps, + "executes_in_executor": False, + "writes_database": False, + } + safety = { + "ai_controlled_apply": True, + "execute_apply": bool(execute_apply), + "target_table": "pchome_product_matches", + "writes_database": bool(applied_count), + "writes_database_count": applied_count, + "persists_candidate": bool(applied_count), + "persists_candidate_count": applied_count, + "syncs_external_offers": False, + "dispatches_telegram": False, + "llm_calls_in_executor": False, + "gemini_allowed": False, + "requires_production_version_truth": True, + } + executor_receipt_ready = ( + result == "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_EXECUTED" + and bool(applied_records) + and readback_pass_count == len(selectors) + ) + root = Path(artifact_root) if artifact_root is not None else Path.cwd() / "data" + receipt_relative_path = ( + f"artifacts/pchome_growth/retry_exception_closeout/" + f"controlled_apply_executor/{executor_id}.json" + ) + executor_receipt_payload = { + "artifact_key": "retry_exception_controlled_apply_executor_receipt", + "executor_id": executor_id, + "source_preflight_id": preflight.get("preflight_id"), + "run_id": preflight.get("run_id"), + "source_policy": preflight_package.get("policy"), + "result": result, + "created_at": preflight_package.get("generated_at"), + "summary": summary, + "controlled_apply_executor": executor_metadata, + "target_selectors": selectors, + "prewrite_snapshots": prewrite_snapshots, + "applied_records": applied_records, + "post_apply_readbacks": post_apply_readbacks, + "rollback_plan": rollback_plan, + "safety": safety, + } + executor_receipt_bytes = _canonical_retry_exception_artifact_bytes(executor_receipt_payload) + executor_receipt_artifact = { + "key": "retry_exception_controlled_apply_executor_receipt", + "artifact_type": "controlled_apply_executor_receipt", + "relative_path": receipt_relative_path, + "payload_sha256": hashlib.sha256(executor_receipt_bytes).hexdigest(), + "byte_count": len(executor_receipt_bytes), + "payload": executor_receipt_payload, + "materialized": False, + "writes_database": False, + } + materialized_executor_artifacts: list[dict[str, Any]] = [] + if materialize_artifacts and executor_receipt_ready: + target_path = _resolve_retry_exception_artifact_path(root, receipt_relative_path) + target_path.parent.mkdir(parents=True, exist_ok=True) + target_path.write_bytes(executor_receipt_bytes) + materialized_executor_artifacts.append({ + "key": executor_receipt_artifact["key"], + "relative_path": receipt_relative_path, + "absolute_path": str(target_path), + "payload_sha256": executor_receipt_artifact["payload_sha256"], + "written_byte_count": target_path.stat().st_size, + "writes_database": False, + }) + executor_receipt_artifact["materialized"] = True + executor_receipt_artifact["absolute_path"] = str(target_path) + + receipt_path = _resolve_retry_exception_artifact_path(root, receipt_relative_path) + actual_receipt_sha = "" + receipt_file_exists = receipt_path.exists() + if receipt_file_exists: + actual_receipt_sha = hashlib.sha256(receipt_path.read_bytes()).hexdigest() + receipt_checks = [ + {"check": "executor_receipt_ready_after_apply", "passed": executor_receipt_ready}, + {"check": "all_post_apply_readbacks_passed", "passed": readback_pass_count == len(selectors)}, + {"check": "applied_record_count_matches_selectors", "passed": applied_count == len(selectors)}, + {"check": "receipt_payload_hash_is_sha256", "passed": len(executor_receipt_artifact["payload_sha256"]) == 64}, + { + "check": "materialized_receipt_exists_when_requested", + "passed": (not materialize_artifacts) or (executor_receipt_ready and receipt_file_exists), + }, + { + "check": "materialized_receipt_hash_matches_expected", + "passed": (not materialize_artifacts) + or (bool(actual_receipt_sha) and actual_receipt_sha == executor_receipt_artifact["payload_sha256"]), + }, + {"check": "receipt_safety_blocks_side_effects", "passed": safety["syncs_external_offers"] is False and safety["dispatches_telegram"] is False}, + ] + post_executor_receipt_verifier = { + "ready": executor_receipt_ready, + "checks": receipt_checks, + "check_count": len(receipt_checks), + "passed": all(check.get("passed") is True for check in receipt_checks), + "expected_sha256": executor_receipt_artifact["payload_sha256"], + "actual_sha256": actual_receipt_sha, + "hash_match": bool(actual_receipt_sha) and actual_receipt_sha == executor_receipt_artifact["payload_sha256"], + "reads_artifact_files": bool(materialize_artifacts), + "writes_database": False, + } + summary["executor_receipt_ready_count"] = 1 if executor_receipt_ready else 0 + summary["executor_receipt_payload_count"] = 1 + summary["executor_receipt_materialized_count"] = len(materialized_executor_artifacts) + summary["executor_receipt_hash_match_count"] = 1 if post_executor_receipt_verifier["hash_match"] else 0 + summary["post_executor_receipt_verifier_check_count"] = len(receipt_checks) + safety["writes_artifact_count"] = len(materialized_executor_artifacts) return { "policy": DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_EXECUTOR_POLICY, "result": result, @@ -4075,64 +4214,24 @@ def build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_execu "source_policy": preflight_package.get("policy"), "stats": preflight_package.get("stats") or {}, "backlog": preflight_package.get("backlog") or {}, - "summary": { - "controlled_apply_executor_ready_count": 1 if executor_ready else 0, - "execute_apply_requested_count": 1 if execute_apply else 0, - "target_selector_count": len(selectors), - "prewrite_snapshot_count": len(prewrite_snapshots), - "applied_record_count": applied_count, - "post_apply_readback_count": len(post_apply_readbacks), - "post_apply_readback_pass_count": readback_pass_count, - "post_apply_readback_fail_count": len(post_apply_readbacks) - readback_pass_count, - "rollback_step_count": len(rollback_steps), - "missing_table_count": len(missing_tables), - "writes_database_count": applied_count, - "persists_candidate_count": applied_count, - }, - "controlled_apply_executor": { - "executor_id": executor_id, - "source_preflight_id": preflight.get("preflight_id"), - "run_id": preflight.get("run_id"), - "stage": "P2_retry_exception_controlled_apply_executor", - "status": result, - "execute_apply": bool(execute_apply), - "target_table": "pchome_product_matches", - "ready_for_apply": executor_ready, - "write_attempted": write_attempted, - "missing_tables": missing_tables, - "requires_fresh_production_truth": True, - }, + "summary": summary, + "controlled_apply_executor": executor_metadata, "target_selectors": selectors, "prewrite_snapshots": prewrite_snapshots, "applied_records": applied_records, "post_apply_readbacks": post_apply_readbacks, - "rollback_plan": { - "rollback_step_count": len(rollback_steps), - "rollback_steps": rollback_steps, - "executes_in_executor": False, - "writes_database": False, - }, + "rollback_plan": rollback_plan, + "executor_receipt_artifact": executor_receipt_artifact, + "materialized_executor_artifacts": materialized_executor_artifacts, + "post_executor_receipt_verifier": post_executor_receipt_verifier, "write_blockers": write_blockers, "source_preflight_summary": preflight_package.get("summary") or {}, "next_actions": [ - "Run executor closeout to persist a machine-verifiable receipt for every applied selector.", + "Use the executor receipt artifact as the machine-verifiable closeout source for every applied selector.", "Run rollback steps only if post-apply readback fails or a future verifier detects drift.", "Keep future writes bounded to selector IDs from this executor package.", ], - "safety": { - "ai_controlled_apply": True, - "execute_apply": bool(execute_apply), - "target_table": "pchome_product_matches", - "writes_database": bool(applied_count), - "writes_database_count": applied_count, - "persists_candidate": bool(applied_count), - "persists_candidate_count": applied_count, - "syncs_external_offers": False, - "dispatches_telegram": False, - "llm_calls_in_executor": False, - "gemini_allowed": False, - "requires_production_version_truth": True, - }, + "safety": safety, } diff --git a/tests/test_pchome_mapping_backlog_report.py b/tests/test_pchome_mapping_backlog_report.py index f6de264..817a8af 100644 --- a/tests/test_pchome_mapping_backlog_report.py +++ b/tests/test_pchome_mapping_backlog_report.py @@ -1237,13 +1237,29 @@ def test_direct_mapping_retry_candidate_exception_controlled_apply_executor_writ assert package["summary"]["post_apply_readback_pass_count"] == 2 assert package["summary"]["writes_database_count"] == 2 assert package["summary"]["persists_candidate_count"] == 2 + assert package["summary"]["executor_receipt_ready_count"] == 1 + assert package["summary"]["executor_receipt_materialized_count"] == 1 + assert package["summary"]["executor_receipt_hash_match_count"] == 1 assert package["controlled_apply_executor"]["target_table"] == "pchome_product_matches" assert package["controlled_apply_executor"]["write_attempted"] is True assert package["controlled_apply_executor"]["ready_for_apply"] is True assert all(readback["passed"] for readback in package["post_apply_readbacks"]) assert package["rollback_plan"]["rollback_step_count"] == 2 + assert package["post_executor_receipt_verifier"]["passed"] is True + assert package["post_executor_receipt_verifier"]["hash_match"] is True + assert len(package["materialized_executor_artifacts"]) == 1 + receipt = package["materialized_executor_artifacts"][0] + receipt_path = Path(receipt["absolute_path"]) + assert receipt_path.exists() + assert hashlib.sha256(receipt_path.read_bytes()).hexdigest() == receipt["payload_sha256"] + receipt_payload = json.loads(receipt_path.read_text(encoding="utf-8")) + assert receipt_payload["executor_id"] == package["controlled_apply_executor"]["executor_id"] + assert receipt_payload["summary"]["applied_record_count"] == 2 + assert receipt_payload["summary"]["post_apply_readback_pass_count"] == 2 + assert receipt_payload["safety"]["writes_database_count"] == 2 assert package["safety"]["execute_apply"] is True assert package["safety"]["writes_database_count"] == 2 + assert package["safety"]["writes_artifact_count"] == 1 with engine.connect() as conn: rows = conn.execute(text(""" SELECT momo_icode, pchome_id, advantage