補齊 PChome retry 例外自動解法
Some checks failed
CD Pipeline / deploy (push) Has been cancelled

This commit is contained in:
ogt
2026-07-01 21:38:09 +08:00
parent a9c2d4b628
commit f02cfd25c5
3 changed files with 252 additions and 0 deletions

View File

@@ -47,6 +47,9 @@ DIRECT_MAPPING_CANDIDATE_EXCEPTION_RESOLUTION_CLOSEOUT_POLICY = (
DIRECT_MAPPING_RETRY_CANDIDATE_DECISION_PACKAGE_POLICY = (
"read_only_pchome_growth_direct_mapping_retry_candidate_decision_package"
)
DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_AUTO_RESOLUTION_POLICY = (
"read_only_pchome_growth_direct_mapping_retry_candidate_exception_auto_resolution"
)
AI_AUTOMATION_READINESS_POLICY = "read_only_pchome_growth_ai_automation_readiness"
EVIDENCE_ENRICHMENT_PREVIEW_POLICY = "read_only_pchome_growth_evidence_enrichment_preview"
EVIDENCE_SOURCE_PREVIEW_POLICY = "read_only_pchome_growth_evidence_source_preview"
@@ -2470,6 +2473,99 @@ def build_pchome_direct_mapping_retry_candidate_decision_package(
}
def build_pchome_direct_mapping_retry_candidate_exception_auto_resolution_package(
payload: dict[str, Any],
batch_size: int = 5,
*,
execute_search: bool = False,
execute_retry_search: bool = False,
limit_per_product: int = 8,
max_terms_per_product: int = 5,
min_score: float = 0.45,
search_func: Any = None,
) -> dict[str, Any]:
"""Build retry-exception auto-resolution artifacts without routing to human review."""
retry_package = build_pchome_direct_mapping_retry_candidate_decision_package(
payload,
batch_size=batch_size,
execute_search=execute_search,
execute_retry_search=execute_retry_search,
limit_per_product=limit_per_product,
max_terms_per_product=max_terms_per_product,
min_score=min_score,
search_func=search_func,
)
package = retry_package.get("retry_candidate_decision_package") or {}
retry_exception_receipts = list(package.get("machine_review_exception_receipts") or [])
artifacts = _build_candidate_exception_auto_resolution_artifacts(retry_exception_receipts)
artifact_summary = _summarize_exception_auto_resolution_artifacts(artifacts)
selected_direct_count = int((retry_package.get("summary") or {}).get("selected_direct_mapping_count") or 0)
retry_decision_count = int((retry_package.get("summary") or {}).get("retry_candidate_decision_count") or 0)
if not selected_direct_count:
result = "NO_DIRECT_MAPPING_TARGETS"
elif artifacts:
result = "DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_AUTO_RESOLUTION_READY"
elif retry_decision_count:
result = "DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTIONS_CLEAR"
else:
result = "WAITING_FOR_RETRY_CANDIDATE_DECISIONS"
return {
"policy": DIRECT_MAPPING_RETRY_CANDIDATE_EXCEPTION_AUTO_RESOLUTION_POLICY,
"result": result,
"success": bool(retry_package.get("success")),
"generated_at": retry_package.get("generated_at"),
"source_policy": retry_package.get("policy"),
"stats": retry_package.get("stats") or {},
"backlog": retry_package.get("backlog") or {},
"summary": {
"direct_mapping_count": int((retry_package.get("summary") or {}).get("direct_mapping_count") or 0),
"selected_direct_mapping_count": selected_direct_count,
"retry_candidate_count": int((retry_package.get("summary") or {}).get("retry_candidate_count") or 0),
"retry_candidate_decision_count": retry_decision_count,
"retry_no_write_verifier_input_count": int(
(retry_package.get("summary") or {}).get("retry_no_write_verifier_input_count") or 0
),
"retry_machine_review_exception_count": len(retry_exception_receipts),
"retry_exception_auto_resolution_artifact_count": len(artifacts),
"ready_for_no_write_verifier_count": int(
(retry_package.get("summary") or {}).get("ready_for_no_write_verifier_count") or 0
),
**artifact_summary,
"writes_database_count": 0,
"persists_candidate_count": 0,
},
"retry_exception_auto_resolution_package": {
"stage": "P2_retry_candidate_exception_auto_resolution",
"execute_search": bool(execute_search),
"execute_retry_search": bool(execute_retry_search),
"retry_machine_review_exception_receipts": retry_exception_receipts,
"retry_exception_auto_resolution_artifacts": artifacts,
"resolution_mode": "ai_controlled_read_only",
"manual_review_mode": "exception_only",
},
"upstream_retry_decision_summary": retry_package.get("summary") or {},
"next_actions": [
"Feed retry exception artifacts into a retry exception closeout package before another search cycle.",
"Send ready no-write verifier receipts into verifier artifact preview before any persistence.",
"Keep database writes behind controlled apply, rollback, verifier, and production readback.",
],
"safety": {
"read_only_preview": True,
"executes_search": bool(execute_search),
"executes_retry_search": bool(execute_retry_search),
"writes_database": False,
"persists_candidate": False,
"syncs_external_offers": False,
"dispatches_telegram": False,
"llm_calls_in_preview": False,
"gemini_allowed": False,
"requires_production_version_truth": True,
},
}
def build_pchome_evidence_enrichment_preview(payload: dict[str, Any], batch_size: int = 5) -> dict[str, Any]:
"""Build a read-only evidence enrichment package for mapping targets."""
operator_preview = build_pchome_mapping_operator_preview(payload, batch_size=batch_size)