Files
ewoooc/services/market_intel/mcp_professional_source_governance.py
ogt 0a7bdd819b
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
清除市場情報受控套用剩餘人工語意
2026-07-01 13:53:46 +08:00

392 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報專業來源治理 gate。
本模組把主流市場資料採集做法轉成可審核合約:
robots/REP、sitemap、structured data、canonical URL、rate limit、
公開資料邊界、provenance、snapshot hash 與 idempotency。
API/UI 只審核操作員提供的治理摘要;不抓外站、不讀 robots/sitemap、
不開 DB、不寫檔、不掛 scheduler。
"""
from urllib.parse import urlparse
from services.market_intel.mcp_fetch_candidate_queue_writer_post_closeout_inventory_review import (
FORBIDDEN_SECRET_KEYS,
SAFE_SECRET_METADATA_KEYS,
_as_dict,
_as_list,
_blocked_side_effects,
_contains_forbidden_key,
_safe_int,
_safe_path,
_safe_text,
)
from services.market_intel.mcp_fetch_candidate_queue_writer_run_readiness import (
ARTIFACT_PREFIX,
)
from services.market_intel.mcp_professional_source_governance_gates import (
CONTRACT_SCOPE,
SOURCE_POLICY_VERSION,
_is_public_http_url,
build_professional_source_governance_gates,
)
from services.market_intel.mcp_professional_source_governance_sample import (
build_sample_professional_source_governance_package,
)
_BLOCKED_SOURCE_GOVERNANCE_SIDE_EFFECT_KEYS = (
"allow_api_database_write",
"allow_api_execution",
"allow_api_file_write",
"allow_api_network_fetch",
"allow_database_write",
"allow_external_network",
"allow_scheduler_attach",
"api_fetches_robots_txt",
"api_fetches_sitemap",
"api_fetches_source_url",
"api_opens_database_connection",
"api_uses_external_network",
"api_writes_database",
"api_writes_file",
"database_commit_executed",
"database_write_executed",
"external_network_executed",
"fetch_executed",
"file_written",
"network_request_allowed",
"payload_persisted",
"ready_for_api_database_write",
"real_write_allowed_by_api",
"scheduler_attached",
"write_database",
"writes_executed",
"would_write_database",
)
_RAW_PAYLOAD_KEYS = (
"body_html",
"full_response_body",
"html",
"page_body",
"page_html",
"raw_html",
"raw_page_html",
"response_body",
)
def _safe_float(value):
try:
return float(value or 0)
except (TypeError, ValueError):
return 0.0
def _contains_raw_payload(value):
if isinstance(value, dict):
for key, nested in value.items():
if str(key).lower() in _RAW_PAYLOAD_KEYS and bool(nested):
return True
if _contains_raw_payload(nested):
return True
if isinstance(value, list):
return any(_contains_raw_payload(item) for item in value)
return False
def _blocked_source_governance_side_effects(payload):
found = list(_blocked_side_effects(payload))
def visit(value, path):
if isinstance(value, dict):
for key, item in value.items():
normalized_key = str(key).lower()
key_path = f"{path}.{key}" if path else key
if (
normalized_key in _BLOCKED_SOURCE_GOVERNANCE_SIDE_EFFECT_KEYS
and bool(item)
):
found.append(key_path)
visit(item, key_path)
elif isinstance(value, list):
for index, item in enumerate(value):
visit(item, f"{path}[{index}]")
visit(payload, "")
return sorted(set(found))
def _normalize_host(value):
if not value:
return None
parsed = urlparse(value)
return parsed.netloc.lower() or None
def _source_summary(source):
source = _as_dict(source)
source_url = _safe_text(source.get("source_url"), 500)
canonical_url = _safe_text(source.get("canonical_url"), 500)
robots_url = _safe_text(source.get("robots_url"), 500)
sitemap_url = _safe_text(source.get("sitemap_url"), 500)
structured_data_types = [
_safe_text(item, 80)
for item in _as_list(source.get("structured_data_types"))
if _safe_text(item, 80)
]
max_requests = _safe_int(source.get("max_requests_per_run"))
crawl_delay_seconds = _safe_float(source.get("crawl_delay_seconds"))
evidence_artifact_path = _safe_text(source.get("evidence_artifact_path"))
source_host = _normalize_host(source_url)
canonical_host = _normalize_host(canonical_url)
return {
"platform_code": _safe_text(source.get("platform_code"), 80),
"source_key": _safe_text(source.get("source_key"), 160),
"source_url": source_url,
"canonical_url": canonical_url,
"robots_url": robots_url,
"sitemap_url": sitemap_url,
"lastmod_source": _safe_text(source.get("lastmod_source"), 160),
"source_url_safe": _is_public_http_url(source_url),
"canonical_url_safe": _is_public_http_url(canonical_url),
"robots_url_safe": _is_public_http_url(robots_url),
"sitemap_url_safe": _is_public_http_url(sitemap_url),
"canonical_host_matches_source": bool(
source_host and canonical_host and source_host == canonical_host
),
"robots_policy_checked": bool(source.get("robots_policy_checked")),
"robots_allowed": bool(source.get("robots_allowed")),
"tos_public_page_checked": bool(source.get("tos_public_page_checked")),
"login_required": bool(source.get("login_required")),
"member_or_order_data": bool(source.get("member_or_order_data")),
"cart_order_or_pii": bool(source.get("cart_order_or_pii")),
"anti_bot_bypass_required": bool(source.get("anti_bot_bypass_required")),
"structured_data_preferred": bool(source.get("structured_data_preferred")),
"json_ld_first": bool(source.get("json_ld_first")),
"dom_selector_fallback_allowed": bool(
source.get("dom_selector_fallback_allowed")
),
"structured_data_types": structured_data_types,
"selector_version": _safe_text(source.get("selector_version"), 120),
"crawl_delay_seconds": crawl_delay_seconds,
"max_requests_per_run": max_requests,
"public_cache_ttl_hours": _safe_int(source.get("public_cache_ttl_hours")),
"evidence_artifact_path": evidence_artifact_path,
"evidence_artifact_path_safe": _safe_path(
evidence_artifact_path,
prefixes=(ARTIFACT_PREFIX,),
suffixes=(".json",),
),
"provenance_required": bool(source.get("provenance_required")),
"snapshot_hash_required": bool(source.get("snapshot_hash_required")),
"idempotency_key_strategy": _safe_text(
source.get("idempotency_key_strategy"), 160
),
}
def _operator_confirmations(payload):
confirmations = _as_dict(payload.get("operator_confirmations"))
return {
"human_reviewed_source_policy": bool(
confirmations.get("human_reviewed_source_policy")
),
"robots_and_tos_checked_by_operator": bool(
confirmations.get("robots_and_tos_checked_by_operator")
),
"public_pages_only": bool(confirmations.get("public_pages_only")),
"no_login_or_member_data": bool(
confirmations.get("no_login_or_member_data")
),
"no_cart_order_or_pii": bool(confirmations.get("no_cart_order_or_pii")),
"no_antibot_bypass": bool(confirmations.get("no_antibot_bypass")),
"structured_data_first": bool(confirmations.get("structured_data_first")),
"provenance_required": bool(confirmations.get("provenance_required")),
"no_api_network_fetch": bool(confirmations.get("no_api_network_fetch")),
"no_database_write": bool(confirmations.get("no_database_write")),
"no_scheduler_attach": bool(confirmations.get("no_scheduler_attach")),
"no_secret_payload": bool(confirmations.get("no_secret_payload")),
}
def _governance_summary(payload):
payload = _as_dict(payload)
sources = [_source_summary(source) for source in _as_list(payload.get("sources"))]
blocked_side_effects = _blocked_source_governance_side_effects(payload)
raw_payload_submitted = _contains_raw_payload(payload)
secret_or_token_submitted = _contains_forbidden_key(
payload,
FORBIDDEN_SECRET_KEYS,
safe_keys=SAFE_SECRET_METADATA_KEYS
| {
"no_api_network_fetch",
"no_secret_payload",
"source_contract_version",
},
)
return {
"governance_id": _safe_text(payload.get("governance_id"), 160),
"governance_scope": _safe_text(payload.get("governance_scope"), 160),
"policy_version": _safe_text(payload.get("policy_version"), 120),
"source_contract_version": _safe_text(
payload.get("source_contract_version"), 120
),
"sources": sources,
"source_count": len(sources),
"platform_count": len(
{source["platform_code"] for source in sources if source["platform_code"]}
),
"robots_checked_count": len(
[
source
for source in sources
if source["robots_policy_checked"] and source["robots_allowed"]
]
),
"structured_data_ready_count": len(
[
source
for source in sources
if source["structured_data_preferred"] and source["json_ld_first"]
]
),
"min_crawl_delay_seconds": min(
[source["crawl_delay_seconds"] for source in sources] or [0.0]
),
"max_requests_per_run": max(
[source["max_requests_per_run"] for source in sources] or [0]
),
"operator_confirmations": _operator_confirmations(payload),
"raw_payload_submitted_to_api": raw_payload_submitted,
"secret_or_token_submitted_to_api": secret_or_token_submitted,
"blocked_side_effects": blocked_side_effects,
"api_uses_external_network": bool(payload.get("api_uses_external_network")),
"api_fetches_robots_txt": bool(payload.get("api_fetches_robots_txt")),
"api_fetches_sitemap": bool(payload.get("api_fetches_sitemap")),
"api_fetches_source_url": bool(payload.get("api_fetches_source_url")),
"api_opens_database_connection": bool(
payload.get("api_opens_database_connection")
),
"api_writes_database": bool(payload.get("api_writes_database")),
"api_writes_file": bool(payload.get("api_writes_file")),
"scheduler_attached": bool(payload.get("scheduler_attached")),
}
def _source_contract():
return {
"contract_scope": CONTRACT_SCOPE,
"policy_version": SOURCE_POLICY_VERSION,
"mainstream_practices": [
{
"key": "robots_exclusion_protocol",
"label": "先 AI 自動驗證確認 robots.txt / REP不由 API 自動抓取或繞過",
"reference_url": "https://www.rfc-editor.org/rfc/rfc9309",
},
{
"key": "sitemap_provenance",
"label": "以 sitemap / lastmod 作為活動來源發現與更新依據之一",
"reference_url": "https://www.sitemaps.org/protocol.html",
},
{
"key": "structured_data_first",
"label": "優先解析 JSON-LD / schema.org Product、Offer、ItemList",
"reference_url": "https://developers.google.com/search/docs/appearance/structured-data/product-snippet",
},
{
"key": "canonical_public_url",
"label": "所有來源需保留 canonical URL、公開 URL 與 host provenance",
"reference_url": "https://developers.google.com/search/docs/crawling-indexing/consolidate-duplicate-urls",
},
{
"key": "bronze_silver_gold",
"label": "raw evidence、normalized source、reviewed product/match 分層保存",
"reference_url": "internal:market_intel_lakehouse_contract",
},
],
"required_source_fields": [
"platform_code",
"source_key",
"source_url",
"canonical_url",
"robots_url",
"sitemap_url",
"robots_policy_checked",
"robots_allowed",
"structured_data_types",
"crawl_delay_seconds",
"max_requests_per_run",
"evidence_artifact_path",
"snapshot_hash_required",
"idempotency_key_strategy",
],
"forbidden_data": [
"login_page",
"member_profile",
"cart_or_checkout",
"order_data",
"personal_data",
"cookie_or_token",
"anti_bot_bypass",
],
"next_gate": "mcp_fetch_target_review_with_source_governance",
}
def build_mcp_professional_source_governance_preview(
*, operator_source_governance=None, phase=None
):
payload_received = operator_source_governance is not None
governance_payload = _as_dict(operator_source_governance)
governance = _governance_summary(governance_payload)
gates = build_professional_source_governance_gates(
package_received=payload_received,
governance=governance,
)
blocked_reasons = [gate["key"] for gate in gates if not gate["passed"]]
accepted = bool(payload_received and not blocked_reasons)
return {
"mode": (
"mcp_professional_source_governance"
if accepted
else "mcp_professional_source_governance_preview"
),
"phase": phase,
"source_governance_payload_received": payload_received,
"mcp_professional_source_governance_accepted": accepted,
"ready_for_mcp_fetch_source_contract": accepted,
"ready_for_api_database_write": False,
"ready_for_scheduler_attach": False,
"network_request_allowed": False,
"external_network_executed": False,
"api_uses_external_network": False,
"api_fetches_robots_txt": False,
"api_fetches_sitemap": False,
"api_fetches_source_url": False,
"api_opens_database_connection": False,
"api_writes_database": False,
"api_writes_file": False,
"database_connection_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"file_written": False,
"payload_persisted": False,
"scheduler_attached": False,
"gate_count": len(gates),
"passed_gate_count": len([gate for gate in gates if gate["passed"]]),
"blocked_reasons": blocked_reasons,
"gates": gates,
"source_governance_summary": governance,
"source_contract": _source_contract(),
"sources": governance["sources"],
"next_operator_steps": [
"人工保留 robots / sitemap / ToS / public URL 審核證據。",
"將通過治理的來源餵給後續 fetch target review不由 API 直接抓外站。",
"正式 fetch 前仍需 MCP readiness、外部 MCP health、manual run package 與 receipt gate。",
],
"sample_professional_source_governance_package": (
build_sample_professional_source_governance_package()
),
}