Files
ewoooc/services/market_intel/mcp_fetch_target_review.py
ogt 0a7bdd819b
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
清除市場情報受控套用剩餘人工語意
2026-07-01 13:53:46 +08:00

562 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 MCP AI-controlled fetch target review preview。
本模組只審核下一段 AI-controlled fetch run package 前的公開目標、節流、樣本數與
回退條件;不發 HTTP request、不開 DB、不保存 payload、不掛 scheduler。
"""
from services.market_intel.adapters import get_adapter_registry
from services.market_intel.ai_controlled_service_compat import (
compatibility_flag,
fetch_handoff_key,
mcp_fetch_handoff_key,
mcp_fetch_handoff_preview_builder,
ready_for_fetch_key,
)
MIN_DELAY_SECONDS = 1.0
MAX_TIMEOUT_SECONDS = 30
MAX_SAMPLE_LIMIT = 5
TARGET_ACKNOWLEDGEMENT_LABELS = {
"target_urls_from_adapter_registry": "目標 URL 只能來自 adapter registry 的公開入口",
"public_pages_only": "只允許公開頁面與公開結構化資料",
"rate_limit_reviewed": "每平台 delay / timeout / max pages 已由 AI 自動驗證確認",
"sample_limits_reviewed": "每平台樣本數已由 AI 自動驗證確認且維持小批次",
"no_login_no_antibot": "不得登入、不得處理會員資料、不得繞反爬或帳號池",
"no_api_fetch_execution": "本 API 不執行抓取或外部 network request",
"no_database_write": "本階段不得寫 DB",
"no_scheduler_attach": "不得掛 scheduler",
}
_BLOCKED_SIDE_EFFECT_KEYS = (
"allow_external_network_in_api",
"api_executes_health_check",
"api_executes_docker",
"api_executes_ssh",
"api_opens_database_connection",
"api_writes_database",
"api_uses_external_network",
"attach_scheduler",
"database_session_created",
"database_write_executed",
"database_commit_executed",
"external_network_executed",
"fetch_executed",
compatibility_flag("fetch_gate_opened_by_api"),
"network_request_allowed",
"scheduler_attached",
"write_database",
"writes_executed",
"would_write_database",
)
def _adapter_targets(registry):
return [
{
"platform_code": adapter.platform_code,
"platform_name": adapter.platform_name,
"base_url": adapter.base_url,
"safety_policy": adapter.safety_policy.to_dict(),
"sources": [source.to_dict() for source in adapter.campaign_sources()],
}
for adapter in registry.values()
]
def _sample_target_review(registry):
targets = []
for adapter in registry.values():
policy = adapter.safety_policy
targets.append(
{
"platform_code": adapter.platform_code,
"source_keys": [
source.source_key for source in adapter.campaign_sources()
],
"public_pages_only": True,
"max_pages": min(2, policy.max_pages_per_run),
"sample_limit": min(3, MAX_SAMPLE_LIMIT),
"delay_seconds": max(MIN_DELAY_SECONDS, policy.request_interval_sec),
"timeout_seconds": min(policy.timeout_sec, MAX_TIMEOUT_SECONDS),
"allow_external_network_in_api": False,
"write_database": False,
"attach_scheduler": False,
"requires_operator_run_command": True,
}
)
return {
"platform_targets": targets,
"rollback_plan": {
"feature_flag_kill_switch": True,
"stop_after_error_count": 2,
"preserve_run_log_only": True,
"no_db_rollback_needed": True,
},
"operator_acknowledgements": {
key: True for key in TARGET_ACKNOWLEDGEMENT_LABELS
},
}
def _sample_target_review_package(registry):
handoff = mcp_fetch_handoff_preview_builder()()
return {
"handoff_package": handoff["sample_handoff_package"],
"target_review": _sample_target_review(registry),
}
def _target_review_from_input(target_review):
return target_review if isinstance(target_review, dict) else {}
def _handoff_review_from_inputs(handoff_package, handoff_review, phase):
if isinstance(handoff_review, dict) and handoff_review:
return handoff_review
handoff_package = handoff_package if isinstance(handoff_package, dict) else {}
return mcp_fetch_handoff_preview_builder()(
promotion_package=handoff_package.get("promotion_package", {}),
promotion_review=handoff_package.get("promotion_review"),
operator_acknowledgements=handoff_package.get(
"operator_acknowledgements",
{},
),
phase=phase,
)
def _blocked_side_effects(payload):
found = []
def visit(value, path):
if isinstance(value, dict):
for key, item in value.items():
key_path = f"{path}.{key}" if path else key
if key in _BLOCKED_SIDE_EFFECT_KEYS and bool(item):
found.append(key_path)
visit(item, key_path)
elif isinstance(value, list):
for index, item in enumerate(value):
visit(item, f"{path}[{index}]")
visit(payload, "")
return found
def _build_known_source_index(registry):
return {
code: {
source.source_key
for source in adapter.campaign_sources()
}
for code, adapter in registry.items()
}
def _to_positive_float(value):
try:
parsed = float(value)
except (TypeError, ValueError):
return None
if parsed <= 0:
return None
return parsed
def _to_positive_int(value):
try:
parsed = int(value)
except (TypeError, ValueError):
return None
if parsed <= 0:
return None
return parsed
def _review_targets(target_review, registry):
known_sources = _build_known_source_index(registry)
platform_targets = target_review.get("platform_targets", [])
if not isinstance(platform_targets, list):
platform_targets = []
summaries = []
unknown_platforms = []
unknown_sources = []
missing_sources = []
unsafe_platforms = []
rate_limit_violations = []
sample_limit_violations = []
for target in platform_targets:
if not isinstance(target, dict):
unsafe_platforms.append("invalid_target_payload")
continue
platform_code = str(target.get("platform_code") or "").lower()
adapter = registry.get(platform_code)
source_keys = target.get("source_keys") or []
if isinstance(source_keys, str):
source_keys = [source_keys]
source_keys = [str(item) for item in source_keys if item]
if not adapter:
unknown_platforms.append(platform_code or "missing_platform_code")
summaries.append(
{
"platform_code": platform_code,
"source_keys": source_keys,
"known_platform": False,
"ready": False,
}
)
continue
allowed_sources = known_sources.get(platform_code, set())
missing = [key for key in source_keys if key not in allowed_sources]
if missing:
unknown_sources.extend(
{
"platform_code": platform_code,
"source_key": key,
}
for key in missing
)
if not source_keys:
missing_sources.append(platform_code)
policy = adapter.safety_policy
delay = _to_positive_float(target.get("delay_seconds"))
timeout = _to_positive_int(target.get("timeout_seconds"))
max_pages = _to_positive_int(target.get("max_pages"))
sample_limit = _to_positive_int(target.get("sample_limit"))
min_delay = max(MIN_DELAY_SECONDS, policy.request_interval_sec)
rate_ok = bool(
delay is not None
and delay >= min_delay
and timeout is not None
and timeout <= MAX_TIMEOUT_SECONDS
)
sample_ok = bool(
max_pages is not None
and max_pages <= policy.max_pages_per_run
and sample_limit is not None
and sample_limit <= MAX_SAMPLE_LIMIT
)
public_only = target.get("public_pages_only") is True
api_network_closed = not target.get("allow_external_network_in_api")
db_closed = not target.get("write_database")
scheduler_closed = not target.get("attach_scheduler")
operator_run_required = target.get("requires_operator_run_command") is True
if not rate_ok:
rate_limit_violations.append(platform_code)
if not sample_ok:
sample_limit_violations.append(platform_code)
if not (
public_only
and api_network_closed
and db_closed
and scheduler_closed
and operator_run_required
):
unsafe_platforms.append(platform_code)
summaries.append(
{
"platform_code": platform_code,
"platform_name": adapter.platform_name,
"base_url": adapter.base_url,
"source_keys": source_keys,
"known_platform": True,
"unknown_source_keys": missing,
"public_pages_only": public_only,
"delay_seconds": delay,
"min_delay_seconds": min_delay,
"timeout_seconds": timeout,
"max_pages": max_pages,
"max_pages_per_policy": policy.max_pages_per_run,
"sample_limit": sample_limit,
"max_sample_limit": MAX_SAMPLE_LIMIT,
"api_network_closed": api_network_closed,
"database_write_closed": db_closed,
"scheduler_attach_closed": scheduler_closed,
"operator_run_required": operator_run_required,
"rate_limit_ok": rate_ok,
"sample_limit_ok": sample_ok,
"ready": bool(
not missing
and source_keys
and public_only
and api_network_closed
and db_closed
and scheduler_closed
and operator_run_required
and rate_ok
and sample_ok
),
}
)
return {
"summaries": summaries,
"target_count": len(summaries),
"source_target_count": sum(
len(item.get("source_keys", [])) for item in summaries
),
"unknown_platforms": unknown_platforms,
"unknown_sources": unknown_sources,
"missing_sources": missing_sources,
"unsafe_platforms": unsafe_platforms,
"rate_limit_violations": rate_limit_violations,
"sample_limit_violations": sample_limit_violations,
}
def _acknowledgement_status(target_review):
acknowledgements = target_review.get("operator_acknowledgements", {})
if not isinstance(acknowledgements, dict):
acknowledgements = {}
return {
key: {
"label": label,
"acknowledged": bool(acknowledgements.get(key)),
}
for key, label in TARGET_ACKNOWLEDGEMENT_LABELS.items()
}
def _rollback_plan_confirmed(target_review):
rollback = target_review.get("rollback_plan", {})
if not isinstance(rollback, dict):
return False
error_count = _to_positive_int(rollback.get("stop_after_error_count"))
return bool(
rollback.get("feature_flag_kill_switch") is True
and rollback.get("preserve_run_log_only") is True
and rollback.get("no_db_rollback_needed") is True
and error_count is not None
and error_count <= 3
)
def build_mcp_fetch_target_review_preview(
*,
handoff_package=None,
handoff_review=None,
target_review=None,
phase=None,
):
"""建立 AI-controlled fetch target review只輸出審核結果不執行 fetch。"""
registry = get_adapter_registry()
handoff_package = handoff_package if isinstance(handoff_package, dict) else {}
handoff = _handoff_review_from_inputs(handoff_package, handoff_review, phase)
target_review_payload = _target_review_from_input(target_review)
target_payload_received = bool(target_review_payload)
target_payload_valid_object = isinstance(target_review, dict) if target_review is not None else True
target_report = _review_targets(target_review_payload, registry)
acknowledgement_status = _acknowledgement_status(target_review_payload)
acknowledgements_complete = all(
item["acknowledged"] for item in acknowledgement_status.values()
)
blocked_side_effects = _blocked_side_effects(target_review_payload)
handoff_side_effects = _blocked_side_effects(handoff)
handoff_accepted = bool(handoff.get(f"{fetch_handoff_key()}_accepted"))
no_api_fetch_execution = bool(
not target_review_payload.get("allow_external_network_in_api")
and not target_review_payload.get("fetch_executed")
and not target_review_payload.get("network_request_allowed")
and not target_review_payload.get(compatibility_flag("fetch_gate_opened_by_api"))
)
no_database_write = bool(
not target_review_payload.get("write_database")
and not target_review_payload.get("api_writes_database")
and not target_review_payload.get("database_write_executed")
)
no_scheduler_attach = bool(
not target_review_payload.get("attach_scheduler")
and not target_review_payload.get("scheduler_attached")
)
gates = [
{
"key": f"{fetch_handoff_key()}_payload_or_review_received",
"passed": bool(handoff_package or handoff_review),
"label": "已提供 AI-controlled fetch handoff package 或已審核結果",
},
{
"key": f"{fetch_handoff_key()}_accepted",
"passed": handoff_accepted,
"label": "AI-controlled fetch handoff 已通過,可進入目標審核",
},
{
"key": "target_payload_received",
"passed": target_payload_received,
"label": "已提供平台目標、節流、樣本數與回退計畫",
},
{
"key": "target_payload_valid_object",
"passed": target_payload_valid_object,
"label": "target review payload 必須是 JSON object",
},
{
"key": "adapter_targets_present",
"passed": bool(registry),
"label": "adapter registry 內已有可審核公開入口",
},
{
"key": "all_target_platforms_known",
"passed": bool(
target_report["target_count"]
and not target_report["unknown_platforms"]
),
"label": "所有目標平台都存在於 adapter registry",
},
{
"key": "all_source_keys_known",
"passed": bool(
target_report["target_count"]
and not target_report["unknown_sources"]
and not target_report["missing_sources"]
),
"label": "所有來源 key 都存在於對應平台公開入口白名單",
},
{
"key": "public_pages_only_confirmed",
"passed": bool(
target_report["target_count"]
and not target_report["unsafe_platforms"]
),
"label": "每個平台都確認公開頁面、人工命令、API 不抓取/不寫入/不掛排程",
},
{
"key": "rate_limits_within_policy",
"passed": bool(
target_report["target_count"]
and not target_report["rate_limit_violations"]
),
"label": "每平台 delay 與 timeout 未突破安全政策",
},
{
"key": "sample_limits_within_policy",
"passed": bool(
target_report["target_count"]
and not target_report["sample_limit_violations"]
),
"label": "每平台 max_pages 與 sample_limit 維持小批次",
},
{
"key": "rollback_plan_confirmed",
"passed": _rollback_plan_confirmed(target_review_payload),
"label": "已確認 kill switch、錯誤停止門檻、只保留 run log 與無 DB rollback",
},
{
"key": "operator_acknowledgements_complete",
"passed": acknowledgements_complete,
"label": "操作員已確認來源、公開頁面、節流、樣本數、無登入/反爬與無副作用",
},
{
"key": "no_api_fetch_execution",
"passed": no_api_fetch_execution,
"label": "本 API 不執行 fetch、不允許 external network",
},
{
"key": "no_database_write",
"passed": no_database_write,
"label": "本階段不寫 DB、不開 commit",
},
{
"key": "no_scheduler_attach",
"passed": no_scheduler_attach,
"label": "本階段不掛 scheduler",
},
{
"key": "target_review_side_effect_free",
"passed": bool(not blocked_side_effects and not handoff_side_effects),
"label": "target review 與 handoff 均未帶入任何執行/寫入/排程副作用旗標",
},
]
gates.extend(
{
"key": f"ack_{key}",
"passed": item["acknowledged"],
"label": item["label"],
}
for key, item in acknowledgement_status.items()
)
blocked_reasons = [
gate["key"] for gate in gates
if not gate["passed"]
]
accepted = bool(target_payload_received and not blocked_reasons)
return {
"mode": (
"mcp_fetch_target_review"
if target_payload_received
else "mcp_fetch_target_review_preview"
),
"phase": phase,
"target_payload_received": target_payload_received,
"target_payload_valid_object": target_payload_valid_object,
f"{fetch_handoff_key()}_accepted": handoff_accepted,
"operator_acknowledgements_complete": acknowledgements_complete,
"mcp_fetch_target_review_accepted": accepted,
ready_for_fetch_key("run_package_review"): accepted,
compatibility_flag("fetch_gate_opened_by_api"): False,
"network_request_allowed": False,
"fetch_executed": False,
"database_write_executed": False,
"scheduler_attached": False,
"adapter_target_count": len(registry),
"platform_target_count": target_report["target_count"],
"source_target_count": target_report["source_target_count"],
"unknown_platform_codes": target_report["unknown_platforms"],
"unknown_source_keys": target_report["unknown_sources"],
"missing_source_platforms": target_report["missing_sources"],
"unsafe_platform_targets": target_report["unsafe_platforms"],
"rate_limit_violations": target_report["rate_limit_violations"],
"sample_limit_violations": target_report["sample_limit_violations"],
"blocked_side_effects": blocked_side_effects + handoff_side_effects,
"gate_count": len(gates),
"passed_gate_count": sum(1 for gate in gates if gate["passed"]),
"blocked_reasons": blocked_reasons,
"gates": gates,
"operator_acknowledgement_status": acknowledgement_status,
"target_summaries": target_report["summaries"],
"adapter_targets": _adapter_targets(registry),
"handoff_summary": {
"mode": handoff.get("mode"),
"accepted": handoff_accepted,
"passed_gate_count": handoff.get("passed_gate_count", 0),
"gate_count": handoff.get("gate_count", 0),
"blocked_reasons": handoff.get("blocked_reasons", []),
ready_for_fetch_key("gate_operator_review"): bool(
handoff.get(ready_for_fetch_key("gate_operator_review"))
),
},
mcp_fetch_handoff_key(): handoff,
"sample_target_review_package": _sample_target_review_package(registry),
"next_operator_steps": [
"若 target review 通過,只代表可整理人工 fetch run package不代表 API 可抓取",
"操作員需以獨立命令與 run receipt 回報實際公開頁面抓取結果",
"正式 DB write、scheduler attach、Telegram/AI 摘要仍需各自獨立 gate",
],
"payload_persisted": False,
"target_review_persisted": False,
"api_executes_health_check": False,
"api_executes_docker": False,
"api_executes_ssh": False,
"api_opens_database_connection": False,
"api_writes_database": False,
"api_uses_external_network": False,
"database_session_created": False,
"database_commit_executed": False,
"external_network_executed": False,
"writes_executed": False,
"would_write_database": False,
}