Files
ewoooc/services/market_intel/candidate_queue_review_handoff.py
ogt 08d9e3fe7d
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
清除市場情報 P3 相容人工語意
2026-07-01 18:24:51 +08:00

259 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""候選審核 queue review handoff preview。
本模組只在 writer run closeout 通過後整理AI 例外決策交接契約;
不查 DB、不更新 review_state、不讀 approval token、不執行 CLI、不掛 scheduler。
"""
from services.market_intel.ai_controlled_service_compat import compatibility_flag
_QUEUE_REVIEW_AND_INVENTORY_COMPAT_PHASE = compatibility_flag(
"queue_review_and_live_inventory_read_only"
)
_READY_FOR_QUEUE_REVIEW_COMPAT_KEY = "ready_for_" + compatibility_flag(
"queue_review"
)
FORBIDDEN_TOKEN_KEYWORDS = (
"approval_token",
"approval-token",
"market_intel_queue_write_approval",
)
SAFE_TOKEN_METADATA_KEYS = {
"approval_token_present",
"approval_token_valid",
"approval_token_secret_configured",
}
SAFE_APPROVAL_ENV_VAR = "MARKET_INTEL_QUEUE_WRITE_APPROVAL"
def _as_dict(value):
return value if isinstance(value, dict) else {}
def _as_list(value):
if value is None:
return []
if isinstance(value, (list, tuple, set)):
return list(value)
return [value]
def _contains_forbidden_token_key(value):
if isinstance(value, dict):
for key, nested in value.items():
normalized_key = str(key).lower()
if normalized_key in SAFE_TOKEN_METADATA_KEYS and isinstance(nested, bool):
continue
if normalized_key == "approval_env_var" and nested == SAFE_APPROVAL_ENV_VAR:
continue
if any(token_key in normalized_key for token_key in FORBIDDEN_TOKEN_KEYWORDS):
return True
if _contains_forbidden_token_key(nested):
return True
elif isinstance(value, list):
return any(_contains_forbidden_token_key(item) for item in value)
return False
def _dedupe_keys(transaction_preview, closeout):
keys = []
for statement in _as_list(_as_dict(transaction_preview).get("statements")):
lookup = _as_dict(statement.get("lookup"))
parameter_preview = _as_dict(statement.get("parameter_preview"))
dedupe_key = lookup.get("dedupe_key") or parameter_preview.get("dedupe_key")
if dedupe_key:
keys.append(str(dedupe_key))
if not keys:
keys = _as_list(_as_dict(_as_dict(closeout).get("receipt_summary")).get("expected_dedupe_keys"))
return sorted(set(str(key) for key in keys if key))
def _closeout_summary(closeout):
closeout = _as_dict(closeout)
promotion_gate = _as_dict(closeout.get("promotion_gate"))
return {
"provided": bool(closeout),
"mode": closeout.get("mode"),
"closeout_passed": bool(closeout.get("closeout_passed")),
"ready_for_next_manual_phase": bool(closeout.get("ready_for_next_manual_phase")),
"promotion_allowed": bool(promotion_gate.get("allowed")),
"next_manual_phase": promotion_gate.get("next_manual_phase"),
"ready_for_api_database_write": bool(closeout.get("ready_for_api_database_write")),
"ready_for_scheduler_attach": bool(closeout.get("ready_for_scheduler_attach")),
"api_executes_cli": bool(closeout.get("api_executes_cli")),
"api_reads_approval_token": bool(closeout.get("api_reads_approval_token")),
"api_writes_file": bool(closeout.get("api_writes_file")),
"api_writes_database": bool(closeout.get("api_writes_database")),
"database_connection_opened": bool(closeout.get("database_connection_opened")),
"database_write_executed": bool(closeout.get("database_write_executed")),
"database_commit_executed": bool(closeout.get("database_commit_executed")),
"scheduler_attached": bool(closeout.get("scheduler_attached")),
}
def _operator_summary(operator_evidence):
operator_evidence = _as_dict(operator_evidence)
return {
"provided_keys": sorted(operator_evidence.keys()),
"operator_confirmed_queue_review_next": bool(
operator_evidence.get("operator_confirmed_queue_review_next")
),
"operator_confirmed_no_scheduler_attach": bool(
operator_evidence.get("operator_confirmed_no_scheduler_attach")
),
"operator_confirmed_no_api_db_write": bool(
operator_evidence.get("operator_confirmed_no_api_db_write")
),
"approval_token_submitted_to_api": _contains_forbidden_token_key(
operator_evidence
),
}
def _handoff_gates(closeout_summary, operator_summary, expected_dedupe_keys):
return [
{
"key": "closeout_preview_provided",
"label": "必須提供上一階段 closeout preview",
"passed": bool(
closeout_summary["provided"]
and closeout_summary["mode"]
== "candidate_queue_writer_run_closeout_preview"
),
},
{
"key": "closeout_passed",
"label": "closeout 必須通過才可交接AI 例外決策",
"passed": closeout_summary["closeout_passed"],
},
{
"key": "closeout_promotes_manual_queue_review",
"label": "promotion 只能指向 AI 受控 queue review / read-only inventory",
"passed": bool(
closeout_summary["promotion_allowed"]
and closeout_summary["next_manual_phase"]
== _QUEUE_REVIEW_AND_INVENTORY_COMPAT_PHASE
),
},
{
"key": "expected_dedupe_keys_present",
"label": "交接包必須有 expected dedupe key 供AI 自動比對",
"passed": bool(expected_dedupe_keys),
},
{
"key": "closeout_no_api_write_or_scheduler",
"label": "closeout 不得允許 API/UI DB 寫入、CLI 或 scheduler",
"passed": bool(
not closeout_summary["ready_for_api_database_write"]
and not closeout_summary["ready_for_scheduler_attach"]
and not closeout_summary["api_executes_cli"]
and not closeout_summary["api_writes_database"]
and not closeout_summary["database_write_executed"]
and not closeout_summary["database_commit_executed"]
and not closeout_summary["scheduler_attached"]
),
},
{
"key": "operator_confirmed_review_is_manual",
"label": "操作員確認下一步只做 AI 例外決策,不由 API 更新 review_state",
"passed": bool(
operator_summary["operator_confirmed_queue_review_next"]
and operator_summary["operator_confirmed_no_api_db_write"]
and operator_summary["operator_confirmed_no_scheduler_attach"]
),
},
{
"key": "handoff_no_approval_token_submitted_to_api",
"label": "handoff payload 不得包含一次性 approval token key",
"passed": not operator_summary["approval_token_submitted_to_api"],
},
]
def build_candidate_queue_review_handoff(
*,
transaction_preview,
run_closeout,
operator_evidence=None,
):
"""建立 AI 受控 queue review handoff不執行任何副作用。"""
closeout_summary = _closeout_summary(run_closeout)
operator_summary = _operator_summary(operator_evidence)
expected_dedupe_keys = _dedupe_keys(transaction_preview, run_closeout)
gates = _handoff_gates(closeout_summary, operator_summary, expected_dedupe_keys)
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
handoff_ready = bool(not blocked_reasons)
return {
"mode": "candidate_queue_review_handoff_preview",
"target_table": "market_alert_review_queue",
"handoff_ready": handoff_ready,
"ready_for_ai_controlled_queue_review": handoff_ready,
_READY_FOR_QUEUE_REVIEW_COMPAT_KEY: handoff_ready,
"ready_for_live_inventory_read_only": handoff_ready,
"ready_for_api_database_write": False,
"ready_for_scheduler_attach": False,
"api_executes_cli": False,
"api_reads_approval_token": False,
"api_writes_file": False,
"api_writes_database": False,
"api_updates_review_state": False,
"read_only_query_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"database_rollback_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"expected_dedupe_keys": expected_dedupe_keys,
"blocked_reasons": blocked_reasons,
"gates": gates,
"closeout_summary": closeout_summary,
"operator_handoff_summary": operator_summary,
"review_contract": {
"expected_review_state": "needs_review",
"allowed_manual_actions": [
"confirm_queue_row_exists",
"inspect_evidence_json",
"compare_dedupe_key",
"record_human_decision_outside_api",
],
"forbidden_api_actions": [
"update_review_state",
"insert_missing_queue_row",
"dispatch_alert",
"attach_scheduler",
],
"required_columns": [
"dedupe_key",
"review_state",
"priority_lane",
"source_url",
"evidence_json",
"created_at",
],
},
"next_operator_steps": [
"以只讀 inventory 或受控唯讀 DB console 確認 queue row 存在",
"比對 expected dedupe key 與 review_state=needs_review",
"只在AI 例外決策流程中記錄 confirmed / rejected / deferred 決策",
"若 queue row 不存在或 evidence 不一致,停回 writer receipt / closeout",
],
"safe_boundaries": [
"do_not_query_database_from_handoff_preview",
"do_not_update_review_state_from_api",
"do_not_insert_missing_queue_row_from_api",
"do_not_read_approval_token_from_api",
"do_not_execute_cli_from_handoff_preview",
"do_not_attach_scheduler_from_handoff_preview",
"no_remove_orphans",
"no_momo_db_lifecycle_change",
],
}