diff --git a/routes/ai_routes.py b/routes/ai_routes.py index b1dae3f..8d97dac 100644 --- a/routes/ai_routes.py +++ b/routes/ai_routes.py @@ -2481,6 +2481,40 @@ def api_pchome_growth_direct_mapping_retry_candidate_exception_controlled_apply_ }), 500 +@ai_bp.route('/api/ai/pchome-growth/mapping-backlog/direct-mapping-retry-candidate-exception-controlled-apply-receipt-replay-package') +@login_required +def api_pchome_growth_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package(): + """P2 AI-controlled receipt replay for already-applied retry exception product matches.""" + try: + from config import DATABASE_PATH + from services.pchome_mapping_backlog_service import ( + build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package, + ) + + materialize_artifacts = str(request.args.get('materialize_artifacts') or '').strip().lower() in {'1', 'true', 'yes'} + run_id = str(request.args.get('run_id') or '').strip() or None + + engine = _create_icaim_dashboard_engine(DATABASE_PATH) + try: + package = build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package( + run_id=run_id, + materialize_artifacts=materialize_artifacts, + engine=engine, + ) + finally: + engine.dispose() + package["source_endpoint"] = ( + "/api/ai/pchome-growth/mapping-backlog/direct-mapping-retry-candidate-exception-controlled-apply-executor-package" + ) + return jsonify(package) + except Exception as exc: + logger.error("[PChomeGrowth] direct mapping retry candidate exception controlled apply receipt replay 讀取失敗: %s", exc, exc_info=True) + return jsonify({ + "success": False, + "error": "PChome 商品對應 retry 例外 controlled apply receipt replay 暫時無法讀取,請稍後再試。", + }), 500 + + @ai_bp.route('/api/ai/pchome-growth/ai-automation-readiness') @login_required def api_pchome_growth_ai_automation_readiness(): diff --git a/services/pchome_mapping_backlog_service.py b/services/pchome_mapping_backlog_service.py index d7036a8..790fa24 100644 --- a/services/pchome_mapping_backlog_service.py +++ b/services/pchome_mapping_backlog_service.py @@ -71,6 +71,9 @@ DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_PREFLIGHT_POLICY = ( DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_EXECUTOR_POLICY = ( "ai_controlled_pchome_growth_direct_mapping_retry_candidate_exception_controlled_apply_executor" ) +DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_POLICY = ( + "ai_controlled_pchome_growth_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay" +) AI_AUTOMATION_READINESS_POLICY = "read_only_pchome_growth_ai_automation_readiness" EVIDENCE_ENRICHMENT_PREVIEW_POLICY = "read_only_pchome_growth_evidence_enrichment_preview" EVIDENCE_SOURCE_PREVIEW_POLICY = "read_only_pchome_growth_evidence_source_preview" @@ -4237,6 +4240,272 @@ def build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_execu } +def _find_retry_exception_artifact_file(root: Path, subdir: str, run_id: str | None = None) -> Path | None: + artifact_dir = root / "artifacts" / "pchome_growth" / "retry_exception_closeout" / subdir + if run_id: + candidate = artifact_dir / f"{run_id}.json" + return candidate if candidate.exists() else None + candidates = sorted( + artifact_dir.glob("*.json"), + key=lambda path: path.stat().st_mtime, + reverse=True, + ) + return candidates[0] if candidates else None + + +def _load_retry_exception_json_artifact(path: Path | None) -> dict[str, Any]: + if path is None or not path.exists(): + return {} + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {} + return payload if isinstance(payload, dict) else {} + + +def _retry_exception_controlled_apply_receipt_replay_id( + run_id: str, + post_apply_readbacks: list[dict[str, Any]], +) -> str: + payload = { + "run_id": run_id, + "post_apply_readbacks": post_apply_readbacks, + } + digest = hashlib.sha256( + json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8") + ).hexdigest()[:16] + return f"pchome-retry-exception-controlled-apply-receipt-replay-{digest}" + + +def build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package( + *, + artifact_root: str | Path | None = None, + run_id: str | None = None, + materialize_artifacts: bool = False, + engine: Any = None, +) -> dict[str, Any]: + """Rebuild a machine-verifiable executor closeout receipt from artifacts and DB readback.""" + root = Path(artifact_root) if artifact_root is not None else Path.cwd() / "data" + verifier_path = _find_retry_exception_artifact_file(root, "verifier_inputs", run_id) + identity_path = _find_retry_exception_artifact_file(root, "identity_readback", run_id) + preflight_path = _find_retry_exception_artifact_file(root, "controlled_apply_preflight", run_id) + verifier_payload = _load_retry_exception_json_artifact(verifier_path) + identity_payload = _load_retry_exception_json_artifact(identity_path) + preflight_payload = _load_retry_exception_json_artifact(preflight_path) + effective_run_id = ( + run_id + or verifier_payload.get("run_id") + or identity_payload.get("run_id") + or preflight_payload.get("run_id") + or "" + ) + receipts = list(verifier_payload.get("no_write_verifier_receipts") or []) + target_selectors = [ + { + "selector_id": receipt.get("receipt_id"), + "source_receipt_id": receipt.get("receipt_id"), + "source_closeout_receipt_id": receipt.get("source_closeout_receipt_id"), + "momo_product_id": (receipt.get("subject") or {}).get("momo_product_id"), + "momo_product_name": (receipt.get("subject") or {}).get("momo_product_name"), + "momo_price": (receipt.get("subject") or {}).get("momo_price"), + "target_pchome_product_id": (receipt.get("subject") or {}).get("target_pchome_product_id"), + "target_pchome_product_name": (receipt.get("subject") or {}).get("pchome_product_name"), + } + for receipt in receipts + if (receipt.get("subject") or {}).get("momo_product_id") + ] + missing_artifacts = [ + name + for name, payload in ( + ("verifier_inputs", verifier_payload), + ("identity_readback", identity_payload), + ("controlled_apply_preflight", preflight_payload), + ) + if not payload + ] + post_apply_readbacks: list[dict[str, Any]] = [] + if engine is not None and target_selectors: + with engine.connect() as conn: + for selector in target_selectors: + momo_icode = str(selector.get("momo_product_id") or "").strip() + expected_pchome_id = str(selector.get("target_pchome_product_id") or "").strip() + row = _fetch_pchome_product_match_by_momo_icode(conn, momo_icode) + actual_pchome_id = str((row or {}).get("pchome_id") or "").strip() + post_apply_readbacks.append({ + "selector_id": selector.get("selector_id"), + "momo_icode": momo_icode, + "expected_pchome_id": expected_pchome_id, + "actual_pchome_id": actual_pchome_id, + "expected_momo_name": selector.get("momo_product_name"), + "actual_momo_name": (row or {}).get("momo_name"), + "passed": bool(row) and actual_pchome_id == expected_pchome_id, + "writes_database": False, + }) + + readback_pass_count = sum(1 for item in post_apply_readbacks if item.get("passed")) + receipt_ready = ( + not missing_artifacts + and bool(effective_run_id) + and bool(target_selectors) + and engine is not None + and readback_pass_count == len(target_selectors) + ) + if missing_artifacts: + result = "WAITING_FOR_RETRY_EXCEPTION_CONTROLLED_APPLY_REPLAY_ARTIFACTS" + elif engine is None: + result = "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_BLOCKED_ENGINE_REQUIRED" + elif not target_selectors: + result = "WAITING_FOR_RETRY_EXCEPTION_CONTROLLED_APPLY_REPLAY_SELECTORS" + elif receipt_ready: + result = "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED" + else: + result = "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_READBACK_MISMATCH" + + receipt_id = _retry_exception_controlled_apply_receipt_replay_id(effective_run_id, post_apply_readbacks) + summary = { + "source_receipt_count": len(receipts), + "target_selector_count": len(target_selectors), + "post_apply_readback_count": len(post_apply_readbacks), + "post_apply_readback_pass_count": readback_pass_count, + "post_apply_readback_fail_count": len(post_apply_readbacks) - readback_pass_count, + "missing_artifact_count": len(missing_artifacts), + "executor_receipt_ready_count": 1 if receipt_ready else 0, + "executor_receipt_materialized_count": 0, + "executor_receipt_hash_match_count": 0, + "writes_database_count": 0, + } + safety = { + "ai_controlled_apply": True, + "receipt_replay": True, + "reads_artifact_files": True, + "reads_database": engine is not None, + "writes_database": False, + "writes_database_count": 0, + "writes_artifact_count": 0, + "syncs_external_offers": False, + "dispatches_telegram": False, + "gemini_allowed": False, + "requires_production_version_truth": True, + } + rollback_plan = { + "rollback_step_count": 1 if receipt_ready else 0, + "rollback_steps": [ + { + "action": "delete_materialized_executor_replay_receipt", + "receipt_id": receipt_id, + "executes_in_replay": False, + "writes_database": False, + } + ] if receipt_ready else [], + "writes_database": False, + } + receipt_relative_path = ( + f"artifacts/pchome_growth/retry_exception_closeout/" + f"controlled_apply_executor_replay/{receipt_id}.json" + ) + receipt_payload = { + "artifact_key": "retry_exception_controlled_apply_executor_replay_receipt", + "receipt_id": receipt_id, + "run_id": effective_run_id, + "source_policy": DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_POLICY, + "source_artifacts": { + "verifier_inputs": str(verifier_path.relative_to(root)) if verifier_path else None, + "identity_readback": str(identity_path.relative_to(root)) if identity_path else None, + "controlled_apply_preflight": str(preflight_path.relative_to(root)) if preflight_path else None, + }, + "result": result, + "summary": summary, + "identity_readback": { + "momo_product_ids": identity_payload.get("momo_product_ids") or [], + "target_pchome_product_ids": identity_payload.get("target_pchome_product_ids") or [], + "identity_delta_status": identity_payload.get("identity_delta_status"), + }, + "target_selectors": target_selectors, + "post_apply_readbacks": post_apply_readbacks, + "rollback_plan": rollback_plan, + "safety": safety, + } + receipt_bytes = _canonical_retry_exception_artifact_bytes(receipt_payload) + receipt_artifact = { + "key": "retry_exception_controlled_apply_executor_replay_receipt", + "artifact_type": "controlled_apply_executor_replay_receipt", + "relative_path": receipt_relative_path, + "payload_sha256": hashlib.sha256(receipt_bytes).hexdigest(), + "byte_count": len(receipt_bytes), + "payload": receipt_payload, + "materialized": False, + "writes_database": False, + } + materialized_artifacts: list[dict[str, Any]] = [] + if materialize_artifacts and receipt_ready: + target_path = _resolve_retry_exception_artifact_path(root, receipt_relative_path) + target_path.parent.mkdir(parents=True, exist_ok=True) + target_path.write_bytes(receipt_bytes) + materialized_artifacts.append({ + "key": receipt_artifact["key"], + "relative_path": receipt_relative_path, + "absolute_path": str(target_path), + "payload_sha256": receipt_artifact["payload_sha256"], + "written_byte_count": target_path.stat().st_size, + "writes_database": False, + }) + receipt_artifact["materialized"] = True + receipt_artifact["absolute_path"] = str(target_path) + + receipt_path = _resolve_retry_exception_artifact_path(root, receipt_relative_path) + actual_sha = hashlib.sha256(receipt_path.read_bytes()).hexdigest() if receipt_path.exists() else "" + verifier_checks = [ + {"check": "source_artifacts_loaded", "passed": not missing_artifacts, "missing_artifacts": missing_artifacts}, + {"check": "target_selectors_present", "passed": bool(target_selectors)}, + {"check": "post_apply_readbacks_all_passed", "passed": readback_pass_count == len(target_selectors)}, + { + "check": "receipt_materialized_when_requested", + "passed": (not materialize_artifacts) or (receipt_ready and receipt_path.exists()), + }, + { + "check": "receipt_hash_matches_expected", + "passed": (not materialize_artifacts) or (bool(actual_sha) and actual_sha == receipt_artifact["payload_sha256"]), + }, + {"check": "replay_does_not_write_database", "passed": safety["writes_database"] is False}, + ] + summary["executor_receipt_materialized_count"] = len(materialized_artifacts) + summary["executor_receipt_hash_match_count"] = 1 if actual_sha and actual_sha == receipt_artifact["payload_sha256"] else 0 + summary["post_executor_receipt_verifier_check_count"] = len(verifier_checks) + safety["writes_artifact_count"] = len(materialized_artifacts) + return { + "policy": DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAY_POLICY, + "result": result, + "success": not missing_artifacts, + "summary": summary, + "receipt_replay": { + "receipt_id": receipt_id, + "run_id": effective_run_id, + "stage": "P2_retry_exception_controlled_apply_receipt_replay", + "status": result, + "materialize_artifacts": bool(materialize_artifacts), + "artifact_root": str(root), + "ready": receipt_ready, + }, + "target_selectors": target_selectors, + "post_apply_readbacks": post_apply_readbacks, + "executor_receipt_artifact": receipt_artifact, + "materialized_executor_artifacts": materialized_artifacts, + "post_executor_receipt_verifier": { + "checks": verifier_checks, + "check_count": len(verifier_checks), + "passed": all(check.get("passed") is True for check in verifier_checks), + "expected_sha256": receipt_artifact["payload_sha256"], + "actual_sha256": actual_sha, + "hash_match": bool(actual_sha) and actual_sha == receipt_artifact["payload_sha256"], + "writes_database": False, + }, + "rollback_plan": rollback_plan, + "source_artifacts": receipt_payload["source_artifacts"], + "missing_artifacts": missing_artifacts, + "safety": safety, + } + + def build_pchome_evidence_enrichment_preview(payload: dict[str, Any], batch_size: int = 5) -> dict[str, Any]: """Build a read-only evidence enrichment package for mapping targets.""" operator_preview = build_pchome_mapping_operator_preview(payload, batch_size=batch_size) diff --git a/tests/test_pchome_mapping_backlog_report.py b/tests/test_pchome_mapping_backlog_report.py index 57ee883..d691728 100644 --- a/tests/test_pchome_mapping_backlog_report.py +++ b/tests/test_pchome_mapping_backlog_report.py @@ -78,6 +78,7 @@ from services.pchome_mapping_backlog_service import ( build_pchome_direct_mapping_retry_candidate_exception_auto_resolution_package, build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_executor_package, build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_preflight_package, + build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package, build_pchome_direct_mapping_retry_candidate_exception_closeout_verifier_artifact_materialization_package, build_pchome_direct_mapping_retry_candidate_exception_closeout_verifier_artifact_preflight_verifier_package, build_pchome_direct_mapping_retry_candidate_exception_closeout_verifier_artifact_preview_package, @@ -1291,6 +1292,119 @@ def test_direct_mapping_retry_candidate_exception_controlled_apply_executor_bloc assert package["safety"]["writes_database_count"] == 0 +def test_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_materializes_readback(tmp_path): + from sqlalchemy import create_engine, text + + call_count = {"search": 0} + + def fake_search(targets, limit_per_product, max_products, max_terms_per_product, min_score): + call_count["search"] += 1 + if targets[0].get("source_artifact_id"): + return True, "retry_found", [ + { + "product_id": "MOMO-RETRY-REVIEW", + "name": "Direct mapping product 40ml 多款任選", + "price": 499, + "target_pchome_product_id": "PCH-2", + "target_pchome_name": "Direct mapping product 40ml x2", + "target_match_score": 0.74, + "auto_compare_type": "manual_review", + "target_hard_veto": False, + }, + { + "product_id": "MOMO-RETRY-REVIEW-2", + "name": "Direct mapping product 40ml 限量組", + "price": 520, + "target_pchome_product_id": "PCH-2", + "target_pchome_name": "Direct mapping product 40ml x2", + "target_match_score": 0.91, + "auto_compare_type": "manual_review", + "target_hard_veto": False, + }, + ] + return True, "found", [ + { + "product_id": "MOMO-UNIT", + "name": "Direct mapping product 40ml", + "price": 499, + "target_pchome_product_id": "PCH-2", + "target_pchome_name": "Direct mapping product 40ml x2", + "target_match_score": 0.91, + "auto_compare_type": "unit_price", + "target_hard_veto": True, + } + ] + + artifact_package = build_pchome_direct_mapping_retry_candidate_exception_closeout_verifier_artifact_materialization_package( + _payload(), + batch_size=1, + execute_search=True, + execute_retry_search=True, + max_terms_per_product=6, + search_func=fake_search, + materialize_artifacts=True, + artifact_root=tmp_path, + ) + run_id = artifact_package["artifact_materialization_package"]["run_id"] + engine = create_engine(f"sqlite:///{tmp_path / 'receipt-replay.db'}") + with engine.begin() as conn: + conn.execute(text(""" + CREATE TABLE pchome_product_matches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + momo_name TEXT NOT NULL, + momo_icode VARCHAR(120) UNIQUE, + momo_price DOUBLE PRECISION, + pchome_id VARCHAR(120), + pchome_name TEXT, + pchome_url TEXT, + pchome_price DOUBLE PRECISION, + pchome_original DOUBLE PRECISION, + pchome_in_stock BOOLEAN DEFAULT 1, + similarity DOUBLE PRECISION, + price_diff DOUBLE PRECISION, + price_diff_pct DOUBLE PRECISION, + advantage VARCHAR(20), + last_checked TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """)) + conn.execute(text(""" + INSERT INTO pchome_product_matches (momo_name, momo_icode, momo_price, pchome_id, pchome_name, advantage) + VALUES + ('Direct mapping product 40ml 多款任選', 'MOMO-RETRY-REVIEW', 499, 'PCH-2', 'Direct mapping product 40ml x2', 'ai_match'), + ('Direct mapping product 40ml 限量組', 'MOMO-RETRY-REVIEW-2', 520, 'PCH-2', 'Direct mapping product 40ml x2', 'ai_match') + """)) + + package = build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package( + artifact_root=tmp_path, + run_id=run_id, + materialize_artifacts=True, + engine=engine, + ) + + assert package["policy"] == ( + "ai_controlled_pchome_growth_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay" + ) + assert package["result"] == "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED" + assert package["summary"]["target_selector_count"] == 2 + assert package["summary"]["post_apply_readback_pass_count"] == 2 + assert package["summary"]["executor_receipt_ready_count"] == 1 + assert package["summary"]["executor_receipt_materialized_count"] == 1 + assert package["summary"]["executor_receipt_hash_match_count"] == 1 + assert package["post_executor_receipt_verifier"]["passed"] is True + assert package["post_executor_receipt_verifier"]["hash_match"] is True + assert package["safety"]["writes_database"] is False + assert package["safety"]["writes_database_count"] == 0 + assert package["safety"]["writes_artifact_count"] == 1 + receipt = package["materialized_executor_artifacts"][0] + receipt_path = Path(receipt["absolute_path"]) + assert receipt_path.exists() + assert hashlib.sha256(receipt_path.read_bytes()).hexdigest() == receipt["payload_sha256"] + receipt_payload = json.loads(receipt_path.read_text(encoding="utf-8")) + assert receipt_payload["run_id"] == run_id + assert receipt_payload["summary"]["post_apply_readback_pass_count"] == 2 + assert call_count["search"] == 2 + + def test_ai_automation_readiness_makes_automation_visible_without_manual_primary_flow(): readiness = build_pchome_growth_ai_automation_readiness(_payload(), batch_size=1)