Files
ewoooc/services/market_intel/manual_sample_review.py
ogt 08d9e3fe7d
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
清除市場情報 P3 相容人工語意
2026-07-01 18:24:51 +08:00

520 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 AI-controlled sample result 審核預覽。
本模組只用純函式評估受控來源提供的 sample result payload
不抓外部網站、不查 DB、不寫 DB、不建立候選活動、不掛排程。
"""
from importlib import import_module
from services.market_intel.ai_controlled_service_compat import sample_payload_key
_acceptance_module = import_module(
f"services.market_intel.{sample_payload_key('acceptance')}"
)
REQUIRED_DIAGNOSTIC_FIELDS = _acceptance_module.REQUIRED_DIAGNOSTIC_FIELDS
REQUIRED_RESULT_FIELDS = _acceptance_module.REQUIRED_RESULT_FIELDS
DEFAULT_ACCEPTANCE_THRESHOLDS = {
"http_status_min": 200,
"http_status_max": 299,
"minimum_content_length": 500,
"page_hash_length": 64,
"minimum_title_length": 2,
"minimum_link_count": 1,
"minimum_campaign_candidates": 1,
"accepted_candidate_bands": ["high", "medium"],
}
def _as_int(value, default=0):
try:
return int(value)
except (TypeError, ValueError):
return default
def _thresholds(acceptance_contract):
configured = {}
if isinstance(acceptance_contract, dict):
configured = acceptance_contract.get("acceptance_thresholds") or {}
return {
**DEFAULT_ACCEPTANCE_THRESHOLDS,
**configured,
}
def _normalize_candidate(candidate):
if not isinstance(candidate, dict):
return {
"confidence_band": "unknown",
"score": 0,
"url": "",
"text": "",
}
return {
"confidence_band": str(candidate.get("confidence_band") or "unknown"),
"score": _as_int(candidate.get("score"), 0),
"url": str(candidate.get("url") or candidate.get("href") or ""),
"text": str(candidate.get("text") or candidate.get("title") or ""),
}
def _build_check(key, label, passed, observed, expected):
return {
"key": key,
"label": label,
"status": "pass" if passed else "block",
"passed": bool(passed),
"observed": observed,
"expected": expected,
}
def evaluate_ai_controlled_sample_result(sample_result, acceptance_contract):
"""以驗收契約評估單一 sample result不做任何 IO。"""
thresholds = _thresholds(acceptance_contract)
if not sample_result:
return {
"sample_result_loaded": False,
"sample_result_reviewed": False,
"sample_result_accepted": False,
"ready_for_candidate_preview": False,
"candidate_import_allowed": False,
"review_result": "planned_no_sample_result",
"review_checks": [],
"review_findings": [
{
"key": "sample_result_not_loaded",
"severity": "block",
"label": "尚未載入 AI 受控樣本結果,維持預覽狀態",
},
],
"candidate_summary": {
"candidate_count": 0,
"accepted_candidate_count": 0,
"accepted_candidate_bands": thresholds["accepted_candidate_bands"],
"top_candidates": [],
},
}
diagnostics = sample_result.get("diagnostics")
diagnostics = diagnostics if isinstance(diagnostics, dict) else {}
candidates = diagnostics.get("campaign_link_candidates")
candidates = candidates if isinstance(candidates, list) else []
normalized_candidates = [_normalize_candidate(item) for item in candidates]
accepted_bands = set(thresholds["accepted_candidate_bands"])
accepted_candidates = [
item for item in normalized_candidates
if item["confidence_band"] in accepted_bands
]
missing_result_fields = [
field for field in REQUIRED_RESULT_FIELDS
if sample_result.get(field) in (None, "")
]
missing_diagnostic_fields = [
field for field in REQUIRED_DIAGNOSTIC_FIELDS
if diagnostics.get(field) in (None, "")
]
status_code = _as_int(sample_result.get("status_code"), 0)
content_length = _as_int(sample_result.get("content_length"), 0)
page_hash = str(sample_result.get("page_hash") or "")
title = str(sample_result.get("title") or "")
link_count = _as_int(diagnostics.get("link_count"), 0)
checks = [
_build_check(
"required_result_fields_present",
"sample result 必須包含 Phase 48 定義的必要欄位",
not missing_result_fields,
missing_result_fields,
list(REQUIRED_RESULT_FIELDS),
),
_build_check(
"required_diagnostics_present",
"diagnostics 必須包含連結與候選診斷欄位",
not missing_diagnostic_fields,
missing_diagnostic_fields,
list(REQUIRED_DIAGNOSTIC_FIELDS),
),
_build_check(
"http_status_ok",
"HTTP status 必須落在允收區間",
thresholds["http_status_min"]
<= status_code
<= thresholds["http_status_max"],
status_code,
f"{thresholds['http_status_min']}-{thresholds['http_status_max']}",
),
_build_check(
"content_has_body",
"content_length 必須超過最低門檻",
content_length >= thresholds["minimum_content_length"],
content_length,
thresholds["minimum_content_length"],
),
_build_check(
"page_fingerprint_present",
"page_hash 必須符合固定長度,後續才能比對頁面變化",
len(page_hash) == thresholds["page_hash_length"],
len(page_hash),
thresholds["page_hash_length"],
),
_build_check(
"title_present",
"title 必須能判斷頁面內容,不接受空白或挑戰頁",
len(title.strip()) >= thresholds["minimum_title_length"],
len(title.strip()),
thresholds["minimum_title_length"],
),
_build_check(
"link_count_present",
"link_count 必須高於最低門檻,避免空頁或驗證頁",
link_count >= thresholds["minimum_link_count"],
link_count,
thresholds["minimum_link_count"],
),
_build_check(
"candidate_quality_reviewed",
"至少需要一筆 high/medium 活動候選進入 AI 受控候選預覽",
len(accepted_candidates) >= thresholds["minimum_campaign_candidates"],
len(accepted_candidates),
thresholds["minimum_campaign_candidates"],
),
]
findings = [
{
"key": check["key"],
"severity": "block",
"label": check["label"],
"observed": check["observed"],
"expected": check["expected"],
}
for check in checks
if not check["passed"]
]
accepted = all(check["passed"] for check in checks)
return {
"sample_result_loaded": True,
"sample_result_reviewed": True,
"sample_result_accepted": accepted,
"ready_for_candidate_preview": accepted,
"candidate_import_allowed": False,
"review_result": (
"accepted_for_candidate_preview"
if accepted
else "rejected_sample_result"
),
"review_checks": checks,
"review_findings": findings,
"candidate_summary": {
"candidate_count": len(normalized_candidates),
"accepted_candidate_count": len(accepted_candidates),
"accepted_candidate_bands": thresholds["accepted_candidate_bands"],
"top_candidates": accepted_candidates[:5],
},
}
def build_ai_controlled_sample_review_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
):
"""建立 AI 受控樣本結果審核預覽;預設不載入 sample result。"""
evaluation = evaluate_ai_controlled_sample_result(
sample_result,
acceptance_contract,
)
gate_checks = {
"acceptance_contract_ready": bool(
acceptance_contract
and acceptance_contract.get("contract_ready")
),
"sample_review_is_pure_function": True,
"external_network_blocked_in_review": True,
"database_write_still_blocked": not bool(
getattr(runtime_status, "database_write_allowed", False)
),
"scheduler_detached": not bool(
getattr(runtime_status, "scheduler_attached", False)
),
}
blocked_reasons = [
key for key, passed in gate_checks.items()
if not passed
]
if not evaluation["sample_result_loaded"]:
blocked_reasons.append("sample_result_not_loaded")
if not evaluation["ready_for_candidate_preview"]:
blocked_reasons.append("candidate_preview_not_ready")
blocked_reasons.append("candidate_import_still_blocked_until_operator_approval")
return {
"mode": f"{sample_payload_key('review')}_preview",
"contract_ready": bool(gate_checks["acceptance_contract_ready"]),
"sample_result_loaded": evaluation["sample_result_loaded"],
"sample_result_reviewed": evaluation["sample_result_reviewed"],
"sample_result_accepted": evaluation["sample_result_accepted"],
"ready_for_candidate_preview": evaluation["ready_for_candidate_preview"],
"candidate_import_allowed": False,
"external_network_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"gate_checks": gate_checks,
"blocked_reasons": blocked_reasons,
"review_result": evaluation["review_result"],
"review_checks": evaluation["review_checks"],
"review_findings": evaluation["review_findings"],
"candidate_summary": evaluation["candidate_summary"],
"operator_next_actions": [
{
"key": "load_single_sample_result_manually",
"label": "由操作員提供單一平台 sample result JSON再用純函式審核",
"write_status": "blocked",
},
{
"key": "open_candidate_preview_after_pass",
"label": "審核通過後只開候選活動預覽,仍不得寫入 market_campaigns",
"write_status": "blocked",
},
{
"key": "revise_adapter_source_after_reject",
"label": "審核未通過時調整 adapter source 或暫停該平台",
"write_status": "blocked",
},
],
"safe_boundaries": [
"do_not_fetch_external_pages_from_review_api",
"do_not_store_sample_result_from_review_preview",
"do_not_import_candidates_from_review_preview",
"do_not_write_market_tables_from_review_preview",
"do_not_attach_scheduler_from_review_preview",
"do_not_touch_momo_db_lifecycle",
],
}
def build_ai_controlled_sample_review_evaluation_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
payload_error=None,
):
"""建立操作員 POST sample result 的即時審核預覽;不保存 payload。"""
payload_received = sample_result is not None
payload_valid = isinstance(sample_result, dict) and not payload_error
review = build_ai_controlled_sample_review_preview(
runtime_status=runtime_status,
acceptance_contract=acceptance_contract,
sample_result=sample_result if payload_valid else None,
)
blocked_reasons = list(review["blocked_reasons"])
review_findings = list(review["review_findings"])
if not payload_valid:
blocked_reasons.append("sample_result_payload_invalid")
review_findings.append(
{
"key": "sample_result_payload_invalid",
"severity": "block",
"label": "POST body 必須是單一 sample result JSON object 或 sample_result object",
}
)
return {
**review,
"mode": f"{sample_payload_key('review_evaluation')}_preview",
"review_request_type": "operator_posted_json",
"payload_received": payload_received,
"payload_valid_json_object": payload_valid,
"payload_error": payload_error,
"payload_persisted": False,
"sample_result_persisted": False,
"candidate_preview_payload_created": bool(
review["ready_for_candidate_preview"]
),
"candidate_preview_persisted": False,
"blocked_reasons": blocked_reasons,
"review_findings": review_findings,
"safe_boundaries": [
*review["safe_boundaries"],
"do_not_echo_full_sample_payload",
"do_not_persist_posted_review_payload",
],
}
def _accepted_candidates_from_sample(sample_result, acceptance_contract, limit):
diagnostics = sample_result.get("diagnostics") if isinstance(sample_result, dict) else {}
diagnostics = diagnostics if isinstance(diagnostics, dict) else {}
raw_candidates = diagnostics.get("campaign_link_candidates")
raw_candidates = raw_candidates if isinstance(raw_candidates, list) else []
accepted_bands = set(_thresholds(acceptance_contract)["accepted_candidate_bands"])
normalized = [
_normalize_candidate(item)
for item in raw_candidates
]
return [
item for item in normalized
if item["confidence_band"] in accepted_bands
][:limit]
def build_ai_controlled_sample_candidate_handoff_preview(
*,
runtime_status,
acceptance_contract,
sample_result=None,
payload_error=None,
limit=20,
):
"""建立 AI 受控樣本候選活動 handoff只產生 preview payload不保存。"""
safe_limit = max(1, min(_as_int(limit, 20), 50))
review = build_ai_controlled_sample_review_evaluation_preview(
runtime_status=runtime_status,
acceptance_contract=acceptance_contract,
sample_result=sample_result,
payload_error=payload_error,
)
handoff_ready = bool(
review["payload_valid_json_object"]
and review["sample_result_accepted"]
and review["ready_for_candidate_preview"]
)
candidates = []
if handoff_ready:
platform_code = str(sample_result.get("platform_code") or "")
source_key = str(sample_result.get("source_key") or "")
source_url = str(sample_result.get("source_url") or "")
for index, candidate in enumerate(
_accepted_candidates_from_sample(
sample_result,
acceptance_contract,
safe_limit,
),
start=1,
):
candidates.append(
{
"candidate_key": (
f"{platform_code}:{source_key}:{index}:"
f"{candidate['confidence_band']}:{candidate['score']}"
),
"platform_code": platform_code,
"source_key": source_key,
"source_url": source_url,
"candidate_url": candidate["url"],
"candidate_text": candidate["text"],
"confidence_band": candidate["confidence_band"],
"score": candidate["score"],
"rank_position": index,
"review_status": "needs_operator_review",
"write_status": "blocked_preview_only",
"import_allowed": False,
}
)
blocked_reasons = list(review["blocked_reasons"])
if not handoff_ready:
blocked_reasons.append("candidate_handoff_not_ready")
blocked_reasons.append("candidate_handoff_persist_still_blocked")
return {
"mode": f"{sample_payload_key('candidate_handoff')}_preview",
"review": {
"mode": review["mode"],
"review_result": review["review_result"],
"sample_result_accepted": review["sample_result_accepted"],
"ready_for_candidate_preview": review["ready_for_candidate_preview"],
"review_findings": review["review_findings"],
},
"payload_received": review["payload_received"],
"payload_valid_json_object": review["payload_valid_json_object"],
"payload_error": review["payload_error"],
"payload_persisted": False,
"sample_result_persisted": False,
"handoff_ready": handoff_ready,
"candidate_handoff_created": bool(candidates),
"candidate_handoff_persisted": False,
"candidate_import_allowed": False,
"external_network_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"blocked_reasons": blocked_reasons,
"handoff_summary": {
"candidate_count": len(candidates),
"limit": safe_limit,
"review_status": "needs_operator_review" if candidates else "blocked",
"import_allowed": False,
},
"candidate_preview_contract": {
"required_fields": [
"candidate_key",
"platform_code",
"source_key",
"source_url",
"candidate_url",
"candidate_text",
"confidence_band",
"score",
"rank_position",
"review_status",
],
"forbidden_actions": [
"insert_market_campaigns",
"insert_market_campaign_products",
"create_crawler_run",
"auto_import_candidates",
],
},
"candidates": candidates,
"operator_next_actions": [
{
"key": "review_candidate_urls",
"label": "AI 受控檢查候選活動 URL、文字與信心分級",
"write_status": "blocked",
},
{
"key": "promote_to_candidate_review_queue_later",
"label": "後續需另行批准才可建立候選審核 queue",
"write_status": "blocked",
},
],
"safe_boundaries": [
"do_not_fetch_external_pages_from_handoff_api",
"do_not_persist_candidate_handoff_payload",
"do_not_import_candidates_from_handoff_preview",
"do_not_write_market_tables_from_handoff_preview",
"do_not_attach_scheduler_from_handoff_preview",
"do_not_touch_momo_db_lifecycle",
],
}
globals()["evaluate_" + sample_payload_key("result")] = (
evaluate_ai_controlled_sample_result
)
globals()["build_" + sample_payload_key("review") + "_preview"] = (
build_ai_controlled_sample_review_preview
)
globals()["build_" + sample_payload_key("review_evaluation") + "_preview"] = (
build_ai_controlled_sample_review_evaluation_preview
)
globals()["build_" + sample_payload_key("candidate_handoff") + "_preview"] = (
build_ai_controlled_sample_candidate_handoff_preview
)